Add remote context allocation api to service daemon

This allows for more deterministically managing long running sub-daemon
services under `pikerd` using the new context api from `tractor`.
The contexts are allocated in an async exit stack and torn down at root
daemon termination. Spawn brokerds using this method by changing the
persistence entry point to be a `@tractor.context`.
syseng_tweaks
Tyler Goodlet 2021-05-23 10:53:57 -04:00
parent 435e005d6e
commit dcc60524cb
2 changed files with 66 additions and 19 deletions

View File

@ -19,8 +19,9 @@ Structured, daemon tree service management.
""" """
from functools import partial from functools import partial
from typing import Optional, Union from typing import Optional, Union, Callable
from contextlib import asynccontextmanager from contextlib import asynccontextmanager, AsyncExitStack
from collections import defaultdict
from pydantic import BaseModel from pydantic import BaseModel
import trio import trio
@ -44,10 +45,34 @@ class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery actor_n: tractor._trionics.ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
ctx_stack: AsyncExitStack
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True
async def open_remote_ctx(
self,
portal: tractor.Portal,
target: Callable,
**kwargs,
) -> tractor.Context:
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` tearodwn.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
ctx, first = await self.ctx_stack.enter_async_context(
portal.open_context(
target,
**kwargs,
)
)
return ctx
_services: Optional[Services] = None _services: Optional[Services] = None
@ -62,14 +87,14 @@ async def open_pikerd(
debug_mode: bool = False, debug_mode: bool = False,
) -> Optional[tractor._portal.Portal]: ) -> Optional[tractor._portal.Portal]:
""" '''
Start a root piker daemon who's lifetime extends indefinitely Start a root piker daemon who's lifetime extends indefinitely
until cancelled. until cancelled.
A root actor nursery is created which can be used to create and keep A root actor nursery is created which can be used to create and keep
alive underling services (see below). alive underling services (see below).
""" '''
global _services global _services
assert _services is None assert _services is None
@ -91,14 +116,18 @@ async def open_pikerd(
) as _, tractor.open_nursery() as actor_nursery: ) as _, tractor.open_nursery() as actor_nursery:
async with trio.open_nursery() as service_nursery: async with trio.open_nursery() as service_nursery:
# assign globally for future daemon/task creation # setup service mngr singleton instance
_services = Services( async with AsyncExitStack() as stack:
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
)
yield _services # assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
ctx_stack=stack,
)
yield _services
@asynccontextmanager @asynccontextmanager
@ -195,17 +224,19 @@ async def spawn_brokerd(
# call with and then have the ability to unwind the call whenevs. # call with and then have the ability to unwind the call whenevs.
# non-blocking setup of brokerd service nursery # non-blocking setup of brokerd service nursery
_services.service_n.start_soon( await _services.open_remote_ctx(
partial( portal,
portal.run, _setup_persistent_brokerd,
_setup_persistent_brokerd, brokername=brokername,
brokername=brokername,
)
) )
return dname return dname
class Brokerd:
locks = defaultdict(trio.Lock)
@asynccontextmanager @asynccontextmanager
async def maybe_spawn_brokerd( async def maybe_spawn_brokerd(
@ -224,9 +255,15 @@ async def maybe_spawn_brokerd(
dname = f'brokerd.{brokername}' dname = f'brokerd.{brokername}'
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[brokername]
await lock.acquire()
# attach to existing brokerd if possible # attach to existing brokerd if possible
async with tractor.find_actor(dname) as portal: async with tractor.find_actor(dname) as portal:
if portal is not None: if portal is not None:
lock.release()
yield portal yield portal
return return
@ -251,6 +288,7 @@ async def maybe_spawn_brokerd(
) )
async with tractor.wait_for_actor(dname) as portal: async with tractor.wait_for_actor(dname) as portal:
lock.release()
yield portal yield portal

View File

@ -116,7 +116,11 @@ def get_feed_bus(
return _bus return _bus
async def _setup_persistent_brokerd(brokername: str) -> None: @tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str
) -> None:
"""Allocate a actor-wide service nursery in ``brokerd`` """Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by such that feeds can be run in the background persistently by
the broker backend as needed. the broker backend as needed.
@ -129,6 +133,9 @@ async def _setup_persistent_brokerd(brokername: str) -> None:
# background tasks from clients # background tasks from clients
bus = get_feed_bus(brokername, service_nursery) bus = get_feed_bus(brokername, service_nursery)
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the # we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down # parent actor decides to tear it down
await trio.sleep_forever() await trio.sleep_forever()
@ -232,7 +239,7 @@ async def attach_feed_bus(
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
): ) -> None:
# try: # try:
if loglevel is None: if loglevel is None:
@ -274,6 +281,8 @@ async def attach_feed_bus(
# send this even to subscribers to existing feed? # send this even to subscribers to existing feed?
await ctx.send_yield(init_msg) await ctx.send_yield(init_msg)
# deliver a first quote asap
await ctx.send_yield(first_quote) await ctx.send_yield(first_quote)
if sub_only: if sub_only: