diff --git a/piker/_daemon.py b/piker/_daemon.py index 9fd354cf..c792d669 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -1,7 +1,7 @@ """ pikerd daemon lifecylcle & rpc """ -from typing import Optional +from typing import Optional, Dict, Callable from contextlib import asynccontextmanager import tractor @@ -14,14 +14,41 @@ log = get_logger(__name__) _root_nursery: Optional[tractor._trionics.ActorNursery] = None _root_dname = 'pikerd' - _root_modules = [ __name__, - 'piker.exchange._ems', - 'piker.exchange._client', + 'piker.clearing._ems', + 'piker.clearing._client', ] +@asynccontextmanager +async def open_pikerd( + loglevel: Optional[str] = None +) -> Optional[tractor._portal.Portal]: + + global _root_nursery + + async with tractor.open_root_actor( + name=_root_dname, + loglevel=loglevel, + + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + # enable_modules=[__name__], + enable_modules=_root_modules + ): + # init root nursery + try: + async with tractor.open_nursery() as nursery: + _root_nursery = nursery + yield nursery + finally: + # client code may block indefinitely so cancel when + # teardown is invoked + await nursery.cancel() + + @asynccontextmanager async def maybe_open_pikerd( loglevel: Optional[str] = None @@ -41,20 +68,12 @@ async def maybe_open_pikerd( yield portal else: # assume role - async with tractor.open_root_actor( - name=_root_dname, - loglevel=loglevel, - enable_modules=_root_modules - ): - # init root nursery - try: - async with tractor.open_nursery() as nursery: - _root_nursery = nursery - yield None - finally: - # client code may block indefinitely so cancel when - # teardown is invoked - await nursery.cancel() + async with open_pikerd(loglevel) as nursery: + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we want to hand off a nursery for starting (as a sub) + # whatever actor is requesting pikerd. + yield None # brokerd enabled modules @@ -72,10 +91,12 @@ async def spawn_brokerd( **tractor_kwargs ) -> tractor._portal.Portal: + log.info(f'Spawning {brokername} broker daemon') + brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' - log.info(f'Spawning {brokername} broker daemon') - tractor_kwargs = getattr(brokermod, '_spawnkwargs', {}) + + extra_tractor_kwargs = getattr(brokermod, '_spawnkwargs', {}) # TODO: raise exception when _root_nursery == None? global _root_nursery @@ -83,6 +104,32 @@ async def spawn_brokerd( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, - **tractor_kwargs + **extra_tractor_kwargs ) - return portal + return dname + + +async def spawn_emsd( + brokername, + loglevel: Optional[str] = None, + **extra_tractor_kwargs +) -> tractor._portal.Portal: + + from .clearing import _client + + log.info('Spawning emsd') + + # TODO: raise exception when _root_nursery == None? + global _root_nursery + assert _root_nursery + + portal = await _root_nursery.start_actor( + 'emsd', + enable_modules=[ + 'piker.clearing._ems', + 'piker.clearing._client', + ], + loglevel=loglevel, + **extra_tractor_kwargs + ) + return 'emsd'