Merge pull request #203 from pikers/wait_on_daemon_portals

Wait on daemon portals, concurrently.
ci_on_forks
goodboy 2021-07-07 07:47:31 -04:00 committed by GitHub
commit 0675b1fb10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 107 additions and 39 deletions

View File

@ -19,11 +19,12 @@ Structured, daemon tree service management.
"""
from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager, AsyncExitStack
from contextlib import asynccontextmanager
from collections import defaultdict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
from .log import get_logger, get_console_log
@ -45,36 +46,79 @@ _root_modules = [
class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
ctx_stack: AsyncExitStack
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
class Config:
arbitrary_types_allowed = True
async def open_remote_ctx(
async def start_service_task(
self,
name: str,
portal: tractor.Portal,
target: Callable,
**kwargs,
) -> tractor.Context:
) -> (trio.CancelScope, tractor.Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` tearodwn.
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
ctx, first = await self.ctx_stack.enter_async_context(
portal.open_context(
target,
**kwargs,
)
)
return ctx
async def open_context_in_task(
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target,
**kwargs,
) as (ctx, first):
# unblock once the remote context has started
task_status.started((cs, first))
# wait on any context's return value
ctx_res = await ctx.result()
log.info(
f'`pikerd` service {name} started with value {ctx_res}'
)
# wait on any error from the sub-actor
# NOTE: this will block indefinitely until cancelled
# either by error from the target context function or
# by being cancelled here by the surroundingn cancel
# scope
return await (portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal)
return cs, first
async def cancel_service(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
cs.cancel()
return await portal.cancel_actor()
_services: Optional[Services] = None
@ -117,22 +161,22 @@ async def open_pikerd(
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
tractor.open_nursery() as actor_nursery,
):
async with trio.open_nursery() as service_nursery:
# setup service mngr singleton instance
async with AsyncExitStack() as stack:
# # setup service mngr singleton instance
# async with AsyncExitStack() as stack:
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
ctx_stack=stack,
)
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
)
yield _services
yield _services
@asynccontextmanager
@ -174,16 +218,20 @@ async def maybe_open_pikerd(
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
# presume pikerd role
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
@ -209,7 +257,7 @@ class Brokerd:
async def maybe_spawn_daemon(
service_name: str,
spawn_func: Callable,
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
**kwargs,
@ -219,6 +267,13 @@ async def maybe_spawn_daemon(
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
If this function is called from a non-pikerd actor, the
spawned service will persist as long as pikerd does or
it is requested to be cancelled.
This can be seen as a service starting api for remote-actor
clients.
"""
if loglevel:
get_console_log(loglevel)
@ -246,13 +301,24 @@ async def maybe_spawn_daemon(
) as pikerd_portal:
if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_func(**spawn_args)
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
await service_task_target(**spawn_args)
else:
# tell the remote `pikerd` to start the target,
# the target can't return a non-serializable value
# since it is expected that service startingn is
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
await pikerd_portal.run(
spawn_func,
service_task_target,
**spawn_args,
)
@ -267,7 +333,7 @@ async def spawn_brokerd(
loglevel: Optional[str] = None,
**tractor_kwargs,
) -> tractor._portal.Portal:
) -> bool:
log.info(f'Spawning {brokername} broker daemon')
@ -280,6 +346,8 @@ async def spawn_brokerd(
global _services
assert _services
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
portal = await _services.actor_n.start_actor(
dname,
enable_modules=_data_mods + [brokermod.__name__],
@ -291,13 +359,13 @@ async def spawn_brokerd(
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
await _services.open_remote_ctx(
await _services.start_service_task(
dname,
portal,
_setup_persistent_brokerd,
brokername=brokername,
)
return dname
return True
@asynccontextmanager
@ -314,7 +382,7 @@ async def maybe_spawn_brokerd(
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
spawn_func=spawn_brokerd,
service_task_target=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
@ -328,7 +396,7 @@ async def spawn_emsd(
loglevel: Optional[str] = None,
**extra_tractor_kwargs
) -> tractor._portal.Portal:
) -> bool:
"""
Start the clearing engine under ``pikerd``.
@ -352,12 +420,12 @@ async def spawn_emsd(
# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd
await _services.open_remote_ctx(
await _services.start_service_task(
'emsd',
portal,
_setup_persistent_emsd,
)
return 'emsd'
return True
@asynccontextmanager
@ -372,7 +440,7 @@ async def maybe_open_emsd(
async with maybe_spawn_daemon(
'emsd',
spawn_func=spawn_emsd,
service_task_target=spawn_emsd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,

View File

@ -53,6 +53,6 @@ def resproc(
log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text)
else:
log.trace(f"Received json contents:\n{colorize_json(json)}")
log.debug(f"Received json contents:\n{colorize_json(json)}")
return json if return_json else resp