Start forming an services api
Add a `Services` nurseries container singleton for spawning sub-daemons inside the long running `pikerd` tree. Bring in `brokerd` spawning util funcs to start getting eyes on what can be factored into a service api.supervise
							parent
							
								
									38471b7b34
								
							
						
					
					
						commit
						2a51582ec0
					
				
							
								
								
									
										137
									
								
								piker/_daemon.py
								
								
								
								
							
							
						
						
									
										137
									
								
								piker/_daemon.py
								
								
								
								
							|  | @ -1,9 +1,12 @@ | |||
| """ | ||||
| pikerd daemon lifecylcle & rpc | ||||
| """ | ||||
| from typing import Optional | ||||
| from typing import Optional, Union | ||||
| from contextlib import asynccontextmanager | ||||
| from functools import partial | ||||
| 
 | ||||
| from pydantic import BaseModel | ||||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| from .log import get_logger, get_console_log | ||||
|  | @ -12,7 +15,6 @@ from .brokers import get_brokermod | |||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| _root_nursery: Optional[tractor._trionics.ActorNursery] = None | ||||
| _root_dname = 'pikerd' | ||||
| _root_modules = [ | ||||
|     __name__, | ||||
|  | @ -21,17 +23,39 @@ _root_modules = [ | |||
| ] | ||||
| 
 | ||||
| 
 | ||||
| # @dataclass | ||||
| class Services(BaseModel): | ||||
|     actor_n: tractor._trionics.ActorNursery | ||||
|     service_n: trio.Nursery | ||||
| 
 | ||||
|     class Config: | ||||
|         arbitrary_types_allowed = True | ||||
| 
 | ||||
| 
 | ||||
| _services: Optional[Services] = None | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| async def open_pikerd( | ||||
|     loglevel: Optional[str] = None, | ||||
|     **kwargs, | ||||
| ) -> Optional[tractor._portal.Portal]: | ||||
|     """Start a root piker daemon who's lifetime extends indefinitely | ||||
|     until cancelled. | ||||
| 
 | ||||
|     global _root_nursery | ||||
|     A root actor nursery is created which can be used to create and keep | ||||
|     alive underling services (see below). | ||||
| 
 | ||||
|     """ | ||||
|     global _services | ||||
|     assert _services is None | ||||
| 
 | ||||
|     # XXX: this may open a root actor as well | ||||
|     async with tractor.open_nursery( | ||||
| 
 | ||||
|         # passed through to ``open_root_actor`` | ||||
|         name=_root_dname, | ||||
|         loglevel=loglevel, | ||||
| 
 | ||||
|         # TODO: eventually we should be able to avoid | ||||
|         # having the root have more then permissions to | ||||
|  | @ -39,23 +63,28 @@ async def open_pikerd( | |||
|         # enable_modules=[__name__], | ||||
|         enable_modules=_root_modules, | ||||
| 
 | ||||
|         loglevel=loglevel, | ||||
|     ) as nursery: | ||||
|         _root_nursery = nursery | ||||
|         yield nursery | ||||
|     ) as actor_nursery: | ||||
|         async with trio.open_nursery() as service_nursery: | ||||
| 
 | ||||
|             # assign globally for future daemon/task creation | ||||
|             _services = Services( | ||||
|                 actor_n=actor_nursery, | ||||
|                 service_n=service_nursery | ||||
|             ) | ||||
| 
 | ||||
|             yield _services | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| async def maybe_open_pikerd( | ||||
|     loglevel: Optional[str] = None, | ||||
|     **kwargs, | ||||
| ) -> Optional[tractor._portal.Portal]: | ||||
|     """If no ``pikerd`` daemon-root-actor can be found, | ||||
|     assume that role and return a portal to myself | ||||
| ) -> Union[tractor._portal.Portal, Services]: | ||||
|     """If no ``pikerd`` daemon-root-actor can be found start it and | ||||
|     yield up (we should probably figure out returning a portal to self | ||||
|     though). | ||||
| 
 | ||||
|     """ | ||||
|     global _root_nursery | ||||
| 
 | ||||
|     if loglevel: | ||||
|         get_console_log(loglevel) | ||||
| 
 | ||||
|  | @ -72,13 +101,10 @@ async def maybe_open_pikerd( | |||
|     async with open_pikerd( | ||||
|         loglevel, | ||||
|         **kwargs, | ||||
|     ): | ||||
|         assert _root_nursery | ||||
| 
 | ||||
|     ) as _: | ||||
|         # in the case where we're starting up the | ||||
|         # tractor-piker runtime stack in **this** process | ||||
|         # we want to hand off a nursery for starting (as a sub) | ||||
|         # whatever actor is requesting pikerd. | ||||
|         # we return no portal to self. | ||||
|         yield None | ||||
| 
 | ||||
| 
 | ||||
|  | @ -97,6 +123,8 @@ async def spawn_brokerd( | |||
|     **tractor_kwargs | ||||
| ) -> tractor._portal.Portal: | ||||
| 
 | ||||
|     from .data import _setup_persistent_feeds | ||||
| 
 | ||||
|     log.info(f'Spawning {brokername} broker daemon') | ||||
| 
 | ||||
|     brokermod = get_brokermod(brokername) | ||||
|  | @ -105,19 +133,82 @@ async def spawn_brokerd( | |||
|     extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) | ||||
|     tractor_kwargs.update(extra_tractor_kwargs) | ||||
| 
 | ||||
|     # TODO: raise exception when _root_nursery == None? | ||||
|     global _root_nursery | ||||
|     global _services | ||||
|     assert _services | ||||
| 
 | ||||
|     await _root_nursery.start_actor( | ||||
|     portal = await _services.actor_n.start_actor( | ||||
|         dname, | ||||
|         enable_modules=_data_mods + [brokermod.__name__], | ||||
|         loglevel=loglevel, | ||||
|         **tractor_kwargs | ||||
|     ) | ||||
| 
 | ||||
|     # TODO: so i think this is the perfect use case for supporting | ||||
|     # a cross-actor async context manager api instead of this | ||||
|     # shoort-and-forget task spawned in the root nursery, we'd have an | ||||
|     # async exit stack that we'd register the `portal.open_context()` | ||||
|     # call with and then have the ability to unwind the call whenevs. | ||||
| 
 | ||||
|     # non-blocking setup of brokerd service nursery | ||||
|     _services.service_n.start_soon( | ||||
|         partial( | ||||
|             portal.run, | ||||
|             _setup_persistent_feeds, | ||||
|             brokername=brokername, | ||||
|         ) | ||||
|     ) | ||||
| 
 | ||||
|     return dname | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| async def maybe_spawn_brokerd( | ||||
|     brokername: str, | ||||
|     loglevel: Optional[str] = None, | ||||
| 
 | ||||
|     # XXX: you should pretty much never want debug mode | ||||
|     # for data daemons when running in production. | ||||
|     debug_mode: bool = True, | ||||
| ) -> tractor._portal.Portal: | ||||
|     """If no ``brokerd.{brokername}`` daemon-actor can be found, | ||||
|     spawn one in a local subactor and return a portal to it. | ||||
| 
 | ||||
|     """ | ||||
|     if loglevel: | ||||
|         get_console_log(loglevel) | ||||
| 
 | ||||
|     dname = f'brokerd.{brokername}' | ||||
| 
 | ||||
|     # attach to existing brokerd if possible | ||||
|     async with tractor.find_actor(dname) as portal: | ||||
|         if portal is not None: | ||||
|             yield portal | ||||
|             return | ||||
| 
 | ||||
|     # ask root ``pikerd`` daemon to spawn the daemon we need if | ||||
|     # pikerd is not live we now become the root of the | ||||
|     # process tree | ||||
|     async with maybe_open_pikerd( | ||||
|         loglevel=loglevel | ||||
|     ) as pikerd_portal: | ||||
| 
 | ||||
|         if pikerd_portal is None: | ||||
|             # we are root so spawn brokerd directly in our tree | ||||
|             # the root nursery is accessed through process global state | ||||
|             await spawn_brokerd(brokername, loglevel=loglevel) | ||||
| 
 | ||||
|         else: | ||||
|             await pikerd_portal.run( | ||||
|                 spawn_brokerd, | ||||
|                 brokername=brokername, | ||||
|                 loglevel=loglevel, | ||||
|                 debug_mode=debug_mode, | ||||
|             ) | ||||
| 
 | ||||
|         async with tractor.wait_for_actor(dname) as portal: | ||||
|             yield portal | ||||
| 
 | ||||
| 
 | ||||
| async def spawn_emsd( | ||||
|     brokername, | ||||
|     loglevel: Optional[str] = None, | ||||
|  | @ -126,10 +217,10 @@ async def spawn_emsd( | |||
| 
 | ||||
|     log.info('Spawning emsd') | ||||
| 
 | ||||
|     # TODO: raise exception when _root_nursery == None? | ||||
|     global _root_nursery | ||||
|     # TODO: raise exception when _services == None? | ||||
|     global _services | ||||
| 
 | ||||
|     await _root_nursery.start_actor( | ||||
|     await _services.actor_n.start_actor( | ||||
|         'emsd', | ||||
|         enable_modules=[ | ||||
|             'piker.clearing._ems', | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue