# piker: trading gear for hackers # Copyright (C) Tyler Goodlet (in stewardship for piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ Structured, daemon tree service management. """ from typing import Optional, Union, Callable, Any from contextlib import asynccontextmanager as acm from collections import defaultdict from msgspec import Struct import tractor import trio from trio_typing import TaskStatus from .log import get_logger, get_console_log from .brokers import get_brokermod log = get_logger(__name__) _root_dname = 'pikerd' _registry_addr = ('127.0.0.1', 6116) _tractor_kwargs: dict[str, Any] = { # use a different registry addr then tractor's default 'arbiter_addr': _registry_addr } _root_modules = [ __name__, 'piker.clearing._ems', 'piker.clearing._client', ] class Services(Struct): actor_n: tractor._supervise.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} async def start_service_task( self, name: str, portal: tractor.Portal, target: Callable, **kwargs, ) -> (trio.CancelScope, tractor.Context): ''' Open a context in a service sub-actor, add to a stack that gets unwound at ``pikerd`` teardown. This allows for allocating long-running sub-services in our main daemon and explicitly controlling their lifetimes. ''' async def open_context_in_task( task_status: TaskStatus[ trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> Any: with trio.CancelScope() as cs: async with portal.open_context( target, **kwargs, ) as (ctx, first): # unblock once the remote context has started task_status.started((cs, first)) log.info( f'`pikerd` service {name} started with value {first}' ) try: # wait on any context's return value ctx_res = await ctx.result() except tractor.ContextCancelled: return await self.cancel_service(name) else: # wait on any error from the sub-actor # NOTE: this will block indefinitely until # cancelled either by error from the target # context function or by being cancelled here by # the surrounding cancel scope return (await portal.result(), ctx_res) cs, first = await self.service_n.start(open_context_in_task) # store the cancel scope and portal for later cancellation or # retstart if needed. self.service_tasks[name] = (cs, portal) return cs, first # TODO: per service cancellation by scope, we aren't using this # anywhere right? async def cancel_service( self, name: str, ) -> Any: log.info(f'Cancelling `pikerd` service {name}') cs, portal = self.service_tasks[name] # XXX: not entirely sure why this is required, # and should probably be better fine tuned in # ``tractor``? cs.cancel() return await portal.cancel_actor() _services: Optional[Services] = None @acm async def open_pikerd( start_method: str = 'trio', loglevel: Optional[str] = None, # XXX: you should pretty much never want debug mode # for data daemons when running in production. debug_mode: bool = False, ) -> Optional[tractor._portal.Portal]: ''' Start a root piker daemon who's lifetime extends indefinitely until cancelled. A root actor nursery is created which can be used to create and keep alive underling services (see below). ''' global _services assert _services is None # XXX: this may open a root actor as well async with ( tractor.open_root_actor( # passed through to ``open_root_actor`` arbiter_addr=_registry_addr, name=_root_dname, loglevel=loglevel, debug_mode=debug_mode, start_method=start_method, # TODO: eventually we should be able to avoid # having the root have more then permissions to # spawn other specialized daemons I think? enable_modules=_root_modules, ) as _, tractor.open_nursery() as actor_nursery, ): async with trio.open_nursery() as service_nursery: # # setup service mngr singleton instance # async with AsyncExitStack() as stack: # assign globally for future daemon/task creation _services = Services( actor_n=actor_nursery, service_n=service_nursery, debug_mode=debug_mode, ) yield _services @acm async def open_piker_runtime( name: str, enable_modules: list[str] = [], start_method: str = 'trio', loglevel: Optional[str] = None, # XXX: you should pretty much never want debug mode # for data daemons when running in production. debug_mode: bool = False, ) -> Optional[tractor._portal.Portal]: ''' Start a piker actor who's runtime will automatically sync with existing piker actors on the local link based on configuration. ''' global _services assert _services is None # XXX: this may open a root actor as well async with ( tractor.open_root_actor( # passed through to ``open_root_actor`` arbiter_addr=_registry_addr, name=name, loglevel=loglevel, debug_mode=debug_mode, start_method=start_method, # TODO: eventually we should be able to avoid # having the root have more then permissions to # spawn other specialized daemons I think? enable_modules=_root_modules + enable_modules, ) as _, ): yield tractor.current_actor() @acm async def maybe_open_runtime( loglevel: Optional[str] = None, **kwargs, ) -> None: """ Start the ``tractor`` runtime (a root actor) if none exists. """ settings = _tractor_kwargs settings.update(kwargs) if not tractor.current_actor(err_on_no_runtime=False): async with tractor.open_root_actor( loglevel=loglevel, **settings, ): yield else: yield @acm async def maybe_open_pikerd( loglevel: Optional[str] = None, **kwargs, ) -> Union[tractor._portal.Portal, Services]: """If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self though). """ if loglevel: get_console_log(loglevel) # subtle, we must have the runtime up here or portal lookup will fail async with maybe_open_runtime(loglevel, **kwargs): async with tractor.find_actor(_root_dname) as portal: # assert portal is not None if portal is not None: yield portal return # presume pikerd role since no daemon could be found at # configured address async with open_pikerd( loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), ) as _: # in the case where we're starting up the # tractor-piker runtime stack in **this** process # we return no portal to self. yield None # brokerd enabled modules _data_mods = [ 'piker.brokers.core', 'piker.brokers.data', 'piker.data', 'piker.data.feed', 'piker.data._sampling' ] class Brokerd: locks = defaultdict(trio.Lock) @acm async def find_service( service_name: str, ) -> Optional[tractor.Portal]: log.info(f'Scanning for service `{service_name}`') # attach to existing daemon by name if possible async with tractor.find_actor( service_name, arbiter_sockaddr=_registry_addr, ) as maybe_portal: yield maybe_portal async def check_for_service( service_name: str, ) -> bool: ''' Service daemon "liveness" predicate. ''' async with tractor.query_actor( service_name, arbiter_sockaddr=_registry_addr, ) as sockaddr: return sockaddr @acm async def maybe_spawn_daemon( service_name: str, service_task_target: Callable, spawn_args: dict[str, Any], loglevel: Optional[str] = None, **kwargs, ) -> tractor.Portal: ''' If no ``service_name`` daemon-actor can be found, spawn one in a local subactor and return a portal to it. If this function is called from a non-pikerd actor, the spawned service will persist as long as pikerd does or it is requested to be cancelled. This can be seen as a service starting api for remote-actor clients. ''' if loglevel: get_console_log(loglevel) # serialize access to this section to avoid # 2 or more tasks racing to create a daemon lock = Brokerd.locks[service_name] await lock.acquire() async with find_service(service_name) as portal: if portal is not None: lock.release() yield portal return log.warning(f"Couldn't find any existing {service_name}") # ask root ``pikerd`` daemon to spawn the daemon we need if # pikerd is not live we now become the root of the # process tree async with maybe_open_pikerd( loglevel=loglevel, **kwargs, ) as pikerd_portal: if pikerd_portal is None: # we are the root and thus are `pikerd` # so spawn the target service directly by calling # the provided target routine. # XXX: this assumes that the target is well formed and will # do the right things to setup both a sub-actor **and** call # the ``_Services`` api from above to start the top level # service task for that actor. await service_task_target(**spawn_args) else: # tell the remote `pikerd` to start the target, # the target can't return a non-serializable value # since it is expected that service startingn is # non-blocking and the target task will persist running # on `pikerd` after the client requesting it's start # disconnects. await pikerd_portal.run( service_task_target, **spawn_args, ) async with tractor.wait_for_actor(service_name) as portal: lock.release() yield portal await portal.cancel_actor() async def spawn_brokerd( brokername: str, loglevel: Optional[str] = None, **tractor_kwargs, ) -> bool: log.info(f'Spawning {brokername} broker daemon') brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) tractor_kwargs.update(extra_tractor_kwargs) global _services assert _services # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery modpath = brokermod.__name__ broker_enable = [modpath] for submodname in getattr( brokermod, '__enable_modules__', [], ): subpath = f'{modpath}.{submodname}' broker_enable.append(subpath) portal = await _services.actor_n.start_actor( dname, enable_modules=_data_mods + broker_enable, loglevel=loglevel, debug_mode=_services.debug_mode, **tractor_kwargs ) # non-blocking setup of brokerd service nursery from .data import _setup_persistent_brokerd await _services.start_service_task( dname, portal, _setup_persistent_brokerd, brokername=brokername, ) return True @acm async def maybe_spawn_brokerd( brokername: str, loglevel: Optional[str] = None, **kwargs, ) -> tractor.Portal: ''' Helper to spawn a brokerd service *from* a client who wishes to use the sub-actor-daemon. ''' async with maybe_spawn_daemon( f'brokerd.{brokername}', service_task_target=spawn_brokerd, spawn_args={'brokername': brokername, 'loglevel': loglevel}, loglevel=loglevel, **kwargs, ) as portal: yield portal async def spawn_emsd( loglevel: Optional[str] = None, **extra_tractor_kwargs ) -> bool: """ Start the clearing engine under ``pikerd``. """ log.info('Spawning emsd') global _services assert _services portal = await _services.actor_n.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', 'piker.clearing._client', ], loglevel=loglevel, debug_mode=_services.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) # non-blocking setup of clearing service from .clearing._ems import _setup_persistent_emsd await _services.start_service_task( 'emsd', portal, _setup_persistent_emsd, ) return True @acm async def maybe_open_emsd( brokername: str, loglevel: Optional[str] = None, **kwargs, ) -> tractor._portal.Portal: # noqa async with maybe_spawn_daemon( 'emsd', service_task_target=spawn_emsd, spawn_args={'loglevel': loglevel}, loglevel=loglevel, **kwargs, ) as portal: yield portal # TODO: ideally we can start the tsdb "on demand" but it's # probably going to require "rootless" docker, at least if we don't # want to expect the user to start ``pikerd`` with root perms all the # time. # async def maybe_open_marketstored( # loglevel: Optional[str] = None, # **kwargs, # ) -> tractor._portal.Portal: # noqa # async with maybe_spawn_daemon( # 'marketstored', # service_task_target=spawn_emsd, # spawn_args={'loglevel': loglevel}, # loglevel=loglevel, # **kwargs, # ) as portal: # yield portal