diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 34cbfca4..368e8116 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -19,9 +19,12 @@ Broker-daemon-actor "endpoint-hooks": the service task entry points for ``brokerd``. ''' +from __future__ import annotations from contextlib import ( asynccontextmanager as acm, ) +from typing import TYPE_CHECKING +import exceptiongroup as eg import tractor import trio @@ -29,6 +32,9 @@ import trio from . import _util from . import get_brokermod +if TYPE_CHECKING: + from ..data import _FeedsBus + # `brokerd` enabled modules # TODO: move this def to the `.data` subpkg.. # NOTE: keeping this list as small as possible is part of our caps-sec @@ -69,24 +75,40 @@ async def _setup_persistent_brokerd( # set global for this actor to this new process-wide instance B) _util.log = log - from piker.data.feed import ( - _bus, - get_feed_bus, - ) - global _bus - assert not _bus + from piker.data import feed + assert not feed._bus - async with trio.open_nursery() as service_nursery: - # assign a nursery to the feeds bus for spawning - # background tasks from clients - get_feed_bus(brokername, service_nursery) + # allocate a nursery to the bus for spawning background + # tasks to service client IPC requests, normally + # `tractor.Context` connections to explicitly required + # `brokerd` endpoints such as: + # - `stream_quotes()`, + # - `manage_history()`, + # - `allocate_persistent_feed()`, + # - `open_symbol_search()` + # NOTE: see ep invocation details inside `.data.feed`. + try: + async with trio.open_nursery() as service_nursery: + bus: _FeedsBus = feed.get_feed_bus( + brokername, + service_nursery, + ) + assert bus is feed._bus - # unblock caller - await ctx.started() + # unblock caller + await ctx.started() - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + + except eg.ExceptionGroup: + # TODO: likely some underlying `brokerd` IPC connection + # broke so here we handle a respawn and re-connect attempt! + # This likely should pair with development of the OCO task + # nusery in dev over @ `tractor` B) + # https://github.com/goodboy/tractor/pull/363 + raise async def spawn_brokerd( diff --git a/piker/data/feed.py b/piker/data/feed.py index 1871db7d..ea7f360b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -190,7 +190,7 @@ _bus: _FeedsBus = None def get_feed_bus( brokername: str, - nursery: Optional[trio.Nursery] = None, + nursery: trio.Nursery | None = None, ) -> _FeedsBus: '''