From dcc60524cb8660eb4fff540696e796d22bbffbc9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 23 May 2021 10:53:57 -0400 Subject: [PATCH] Add remote context allocation api to service daemon This allows for more deterministically managing long running sub-daemon services under `pikerd` using the new context api from `tractor`. The contexts are allocated in an async exit stack and torn down at root daemon termination. Spawn brokerds using this method by changing the persistence entry point to be a `@tractor.context`. --- piker/_daemon.py | 72 +++++++++++++++++++++++++++++++++++----------- piker/data/feed.py | 13 +++++++-- 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 1d98eee4..bf3f62a2 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -19,8 +19,9 @@ Structured, daemon tree service management. """ from functools import partial -from typing import Optional, Union -from contextlib import asynccontextmanager +from typing import Optional, Union, Callable +from contextlib import asynccontextmanager, AsyncExitStack +from collections import defaultdict from pydantic import BaseModel import trio @@ -44,10 +45,34 @@ class Services(BaseModel): actor_n: tractor._trionics.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag + ctx_stack: AsyncExitStack class Config: arbitrary_types_allowed = True + async def open_remote_ctx( + self, + portal: tractor.Portal, + target: Callable, + **kwargs, + + ) -> tractor.Context: + ''' + Open a context in a service sub-actor, add to a stack + that gets unwound at ``pikerd`` tearodwn. + + This allows for allocating long-running sub-services in our main + daemon and explicitly controlling their lifetimes. + + ''' + ctx, first = await self.ctx_stack.enter_async_context( + portal.open_context( + target, + **kwargs, + ) + ) + return ctx + _services: Optional[Services] = None @@ -62,14 +87,14 @@ async def open_pikerd( debug_mode: bool = False, ) -> Optional[tractor._portal.Portal]: - """ + ''' Start a root piker daemon who's lifetime extends indefinitely until cancelled. A root actor nursery is created which can be used to create and keep alive underling services (see below). - """ + ''' global _services assert _services is None @@ -91,14 +116,18 @@ async def open_pikerd( ) as _, tractor.open_nursery() as actor_nursery: async with trio.open_nursery() as service_nursery: - # assign globally for future daemon/task creation - _services = Services( - actor_n=actor_nursery, - service_n=service_nursery, - debug_mode=debug_mode, - ) + # setup service mngr singleton instance + async with AsyncExitStack() as stack: - yield _services + # assign globally for future daemon/task creation + _services = Services( + actor_n=actor_nursery, + service_n=service_nursery, + debug_mode=debug_mode, + ctx_stack=stack, + ) + + yield _services @asynccontextmanager @@ -195,17 +224,19 @@ async def spawn_brokerd( # call with and then have the ability to unwind the call whenevs. # non-blocking setup of brokerd service nursery - _services.service_n.start_soon( - partial( - portal.run, - _setup_persistent_brokerd, - brokername=brokername, - ) + await _services.open_remote_ctx( + portal, + _setup_persistent_brokerd, + brokername=brokername, ) return dname +class Brokerd: + locks = defaultdict(trio.Lock) + + @asynccontextmanager async def maybe_spawn_brokerd( @@ -224,9 +255,15 @@ async def maybe_spawn_brokerd( dname = f'brokerd.{brokername}' + # serialize access to this section to avoid + # 2 or more tasks racing to create a daemon + lock = Brokerd.locks[brokername] + await lock.acquire() + # attach to existing brokerd if possible async with tractor.find_actor(dname) as portal: if portal is not None: + lock.release() yield portal return @@ -251,6 +288,7 @@ async def maybe_spawn_brokerd( ) async with tractor.wait_for_actor(dname) as portal: + lock.release() yield portal diff --git a/piker/data/feed.py b/piker/data/feed.py index 5d39c7c4..76c2bc23 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -116,7 +116,11 @@ def get_feed_bus( return _bus -async def _setup_persistent_brokerd(brokername: str) -> None: +@tractor.context +async def _setup_persistent_brokerd( + ctx: tractor.Context, + brokername: str +) -> None: """Allocate a actor-wide service nursery in ``brokerd`` such that feeds can be run in the background persistently by the broker backend as needed. @@ -129,6 +133,9 @@ async def _setup_persistent_brokerd(brokername: str) -> None: # background tasks from clients bus = get_feed_bus(brokername, service_nursery) + # unblock caller + await ctx.started() + # we pin this task to keep the feeds manager active until the # parent actor decides to tear it down await trio.sleep_forever() @@ -232,7 +239,7 @@ async def attach_feed_bus( brokername: str, symbol: str, loglevel: str, -): +) -> None: # try: if loglevel is None: @@ -274,6 +281,8 @@ async def attach_feed_bus( # send this even to subscribers to existing feed? await ctx.send_yield(init_msg) + + # deliver a first quote asap await ctx.send_yield(first_quote) if sub_only: