From 7fb2c95ef1b3a30ad098a1e32b9eafe7bfe724c3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Jun 2021 06:56:55 -0400 Subject: [PATCH] Factor daemon spawning logic, use it to spawn emsd --- piker/_daemon.py | 135 +++++++++++++++++++++++++------------- piker/clearing/_client.py | 26 +------- 2 files changed, 91 insertions(+), 70 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 07a584c3..799b1331 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -197,6 +197,66 @@ _data_mods = [ ] +class Brokerd: + locks = defaultdict(trio.Lock) + + +@asynccontextmanager +async def maybe_spawn_daemon( + + service_name: str, + spawn_func: Callable, + spawn_args: dict[str, Any], + # brokername: str, + loglevel: Optional[str] = None, + **kwargs, + +) -> tractor.Portal: + """ + If no ``service_name`` daemon-actor can be found, + spawn one in a local subactor and return a portal to it. + + """ + if loglevel: + get_console_log(loglevel) + + # serialize access to this section to avoid + # 2 or more tasks racing to create a daemon + lock = Brokerd.locks[service_name] + await lock.acquire() + + # attach to existing brokerd if possible + async with tractor.find_actor(service_name) as portal: + if portal is not None: + lock.release() + yield portal + return + + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel, + **kwargs, + ) as pikerd_portal: + + if pikerd_portal is None: + # we are root so spawn brokerd directly in our tree + # the root nursery is accessed through process global state + # await spawn_brokerd(brokername, loglevel=loglevel) + await spawn_func(**spawn_args) + + else: + await pikerd_portal.run( + spawn_func, + **spawn_args, + ) + + async with tractor.wait_for_actor(service_name) as portal: + lock.release() + yield portal + + async def spawn_brokerd( brokername: str, @@ -242,10 +302,6 @@ async def spawn_brokerd( return dname -class Brokerd: - locks = defaultdict(trio.Lock) - - @asynccontextmanager async def maybe_spawn_brokerd( @@ -253,52 +309,20 @@ async def maybe_spawn_brokerd( loglevel: Optional[str] = None, **kwargs, -) -> tractor._portal.Portal: - """ - If no ``brokerd.{brokername}`` daemon-actor can be found, - spawn one in a local subactor and return a portal to it. +) -> tractor.Portal: + '''Helper to spawn a brokerd service. - """ - if loglevel: - get_console_log(loglevel) + ''' + async with maybe_spawn_daemon( - dname = f'brokerd.{brokername}' - - # serialize access to this section to avoid - # 2 or more tasks racing to create a daemon - lock = Brokerd.locks[brokername] - await lock.acquire() - - # attach to existing brokerd if possible - async with tractor.find_actor(dname) as portal: - if portal is not None: - lock.release() - yield portal - return - - # ask root ``pikerd`` daemon to spawn the daemon we need if - # pikerd is not live we now become the root of the - # process tree - async with maybe_open_pikerd( + f'brokerd.{brokername}', + spawn_func=spawn_brokerd, + spawn_args={'brokername': brokername, 'loglevel': loglevel}, loglevel=loglevel, **kwargs, - ) as pikerd_portal: - if pikerd_portal is None: - # we are root so spawn brokerd directly in our tree - # the root nursery is accessed through process global state - await spawn_brokerd(brokername, loglevel=loglevel) - - else: - await pikerd_portal.run( - spawn_brokerd, - brokername=brokername, - loglevel=loglevel, - ) - - async with tractor.wait_for_actor(dname) as portal: - lock.release() - yield portal + ) as portal: + yield portal async def spawn_emsd( @@ -328,3 +352,24 @@ async def spawn_emsd( **extra_tractor_kwargs ) return 'emsd' + + +@asynccontextmanager +async def maybe_open_emsd( + + brokername: str, + loglevel: Optional[str] = None, + **kwargs, + +) -> tractor._portal.Portal: # noqa + + async with maybe_spawn_daemon( + + 'emsd', + spawn_func=spawn_emsd, + spawn_args={'brokername': brokername, 'loglevel': loglevel}, + loglevel=loglevel, + **kwargs, + + ) as portal: + yield portal diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index e881a726..316056be 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -30,6 +30,7 @@ import tractor from ..data._source import Symbol from ..log import get_logger from ._ems import _emsd_main +from .._daemon import maybe_open_emsd log = get_logger(__name__) @@ -174,31 +175,6 @@ async def send_order_cmds(symbol_key: str): book._to_ems.send_nowait(cmd) -@asynccontextmanager -async def maybe_open_emsd( - brokername: str, -) -> tractor._portal.Portal: # noqa - - async with tractor.find_actor('emsd') as portal: - if portal is not None: - yield portal - return - - # ask remote daemon tree to spawn it - from .._daemon import spawn_emsd - - async with tractor.find_actor('pikerd') as portal: - assert portal - - name = await portal.run( - spawn_emsd, - brokername=brokername, - ) - - async with tractor.wait_for_actor(name) as portal: - yield portal - - @asynccontextmanager async def open_ems( broker: str,