From 844626f6dc9e1f94e616bd76cb66e78f4f30fd9a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 12 Jan 2023 10:16:16 -0500 Subject: [PATCH] Move `brokerd` service task to root `.data` mod --- piker/data/__init__.py | 44 ++++++++++++++++++++++++++++++++++++++++-- piker/data/feed.py | 35 ++++----------------------------- 2 files changed, 46 insertions(+), 33 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index e98195b4..5c83150e 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -22,6 +22,12 @@ and storing data from your brokers as well as sharing live streams over a network. """ +import tractor +import trio + +from ..log import ( + get_console_log, +) from ._normalize import iterticks from ._sharedmem import ( maybe_open_shm_array, @@ -32,7 +38,6 @@ from ._sharedmem import ( ) from .feed import ( open_feed, - _setup_persistent_brokerd, ) @@ -44,5 +49,40 @@ __all__ = [ 'attach_shm_array', 'open_shm_array', 'get_shm_token', - '_setup_persistent_brokerd', ] + + +@tractor.context +async def _setup_persistent_brokerd( + ctx: tractor.Context, + brokername: str, + +) -> None: + ''' + Allocate a actor-wide service nursery in ``brokerd`` + such that feeds can be run in the background persistently by + the broker backend as needed. + + ''' + get_console_log(tractor.current_actor().loglevel) + + from .feed import ( + _bus, + get_feed_bus, + ) + global _bus + assert not _bus + + async with trio.open_nursery() as service_nursery: + # assign a nursery to the feeds bus for spawning + # background tasks from clients + get_feed_bus(brokername, service_nursery) + + # unblock caller + await ctx.started() + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + + diff --git a/piker/data/feed.py b/piker/data/feed.py index 88e9ceed..b714c77e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -54,7 +54,10 @@ import numpy as np from ..brokers import get_brokermod from ..calc import humanize -from ..log import get_logger, get_console_log +from ..log import ( + get_logger, + get_console_log, +) from .._daemon import ( maybe_spawn_brokerd, check_for_service, @@ -224,36 +227,6 @@ def get_feed_bus( return _bus -@tractor.context -async def _setup_persistent_brokerd( - ctx: tractor.Context, - brokername: str, - -) -> None: - ''' - Allocate a actor-wide service nursery in ``brokerd`` - such that feeds can be run in the background persistently by - the broker backend as needed. - - ''' - get_console_log(tractor.current_actor().loglevel) - - global _bus - assert not _bus - - async with trio.open_nursery() as service_nursery: - # assign a nursery to the feeds bus for spawning - # background tasks from clients - get_feed_bus(brokername, service_nursery) - - # unblock caller - await ctx.started() - - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() - - def diff_history( array: np.ndarray, timeframe: int,