.brokers._daemon: add notes around needed brokerd respawn tech

basic_buy_bot
Tyler Goodlet 2023-06-20 17:57:00 -04:00
parent a44bc4aeb3
commit 35359861bb
2 changed files with 38 additions and 16 deletions

View File

@ -19,9 +19,12 @@ Broker-daemon-actor "endpoint-hooks": the service task entry points for
``brokerd``. ``brokerd``.
''' '''
from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from typing import TYPE_CHECKING
import exceptiongroup as eg
import tractor import tractor
import trio import trio
@ -29,6 +32,9 @@ import trio
from . import _util from . import _util
from . import get_brokermod from . import get_brokermod
if TYPE_CHECKING:
from ..data import _FeedsBus
# `brokerd` enabled modules # `brokerd` enabled modules
# TODO: move this def to the `.data` subpkg.. # TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec # NOTE: keeping this list as small as possible is part of our caps-sec
@ -69,24 +75,40 @@ async def _setup_persistent_brokerd(
# set global for this actor to this new process-wide instance B) # set global for this actor to this new process-wide instance B)
_util.log = log _util.log = log
from piker.data.feed import ( from piker.data import feed
_bus, assert not feed._bus
get_feed_bus,
)
global _bus
assert not _bus
async with trio.open_nursery() as service_nursery: # allocate a nursery to the bus for spawning background
# assign a nursery to the feeds bus for spawning # tasks to service client IPC requests, normally
# background tasks from clients # `tractor.Context` connections to explicitly required
get_feed_bus(brokername, service_nursery) # `brokerd` endpoints such as:
# - `stream_quotes()`,
# - `manage_history()`,
# - `allocate_persistent_feed()`,
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with trio.open_nursery() as service_nursery:
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,
)
assert bus is feed._bus
# unblock caller # unblock caller
await ctx.started() await ctx.started()
# we pin this task to keep the feeds manager active until the # we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down # parent actor decides to tear it down
await trio.sleep_forever() await trio.sleep_forever()
except eg.ExceptionGroup:
# TODO: likely some underlying `brokerd` IPC connection
# broke so here we handle a respawn and re-connect attempt!
# This likely should pair with development of the OCO task
# nusery in dev over @ `tractor` B)
# https://github.com/goodboy/tractor/pull/363
raise
async def spawn_brokerd( async def spawn_brokerd(

View File

@ -190,7 +190,7 @@ _bus: _FeedsBus = None
def get_feed_bus( def get_feed_bus(
brokername: str, brokername: str,
nursery: Optional[trio.Nursery] = None, nursery: trio.Nursery | None = None,
) -> _FeedsBus: ) -> _FeedsBus:
''' '''