Better separate service tasks vs. ctxs via methods

Namely splitting the handles for each in 2 separate tables and adding
a `.cancel_service_task()`.

Also,
- move `_open_and_supervise_service_ctx()` to mod level.
- rename `target` -> `ctx_fn` params througout.
- fill out method doc strings.
hilevel_serman
Tyler Goodlet 2024-12-11 14:24:49 -05:00
parent 46dbe6d2fc
commit 840c328f19
1 changed files with 201 additions and 143 deletions

View File

@ -197,7 +197,7 @@ async def open_service_mngr(
yield mngr yield mngr
finally: finally:
# TODO: is this more clever/efficient? # TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks: # if 'samplerd' in mngr.service_ctxs:
# await mngr.cancel_service('samplerd') # await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel() tn.cancel_scope.cancel()
@ -231,6 +231,108 @@ def get_service_mngr() -> ServiceMngr:
return maybe_mngr return maybe_mngr
async def _open_and_supervise_service_ctx(
serman: ServiceMngr,
name: str,
ctx_fn: Callable, # TODO, type for `@tractor.context` requirement
portal: Portal,
allow_overruns: bool = False,
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
**ctx_kwargs,
) -> Any:
'''
Open a remote IPC-context defined by `ctx_fn` in the
(service) actor accessed via `portal` and supervise the
(local) parent task to termination at which point the remote
actor runtime is cancelled alongside it.
The main application is for allocating long-running
"sub-services" in a main daemon and explicitly controlling
their lifetimes from an actor-global singleton.
'''
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
ctx_fn,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() # terminate (remote) sub-actor
complete.set() # signal caller this task is done
serman.service_ctxs.pop(name) # remove mngr entry
# TODO: we need remote wrapping and a general soln: # TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the # - factor this into a ``tractor.highlevel`` extension # pack for the
# library. # library.
@ -252,6 +354,14 @@ class ServiceMngr:
debug_mode: bool = False # tractor sub-actor debug mode flag debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[ service_tasks: dict[
str,
tuple[
trio.CancelScope,
trio.Event,
]
] = field(default_factory=dict)
service_ctxs: dict[
str, str,
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
@ -319,8 +429,6 @@ class ServiceMngr:
# retstart if needed. # retstart if needed.
self.service_tasks[name] = ( self.service_tasks[name] = (
cs, cs,
None,
None,
complete, complete,
) )
return ( return (
@ -328,13 +436,33 @@ class ServiceMngr:
complete, complete,
) )
async def cancel_service_task(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
# TODO, if we use the `TaskMngr` from #346
# we can also get the return value from the task!
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service task {name!r} not terminated!?\n'
)
async def start_service_ctx( async def start_service_ctx(
self, self,
name: str, name: str,
portal: Portal, portal: Portal,
# TODO: typevar for the return type of the target and then # TODO: typevar for the return type of the target and then
# use it below for `ctx_res`? # use it below for `ctx_res`?
target: Callable, ctx_fn: Callable,
**ctx_kwargs, **ctx_kwargs,
) -> tuple[ ) -> tuple[
@ -342,150 +470,41 @@ class ServiceMngr:
Context, Context,
Any, Any,
]: ]:
cs, sub_ctx, complete, started = await self.service_n.start( '''
Start a remote IPC-context defined by `ctx_fn` in a background
task and immediately return supervision primitives to manage it:
- a `cs: CancelScope` for the newly allocated bg task
- the `ipc_ctx: Context` to manage the remotely scheduled
`trio.Task`.
- the `started: Any` value returned by the remote endpoint
task's `Context.started(<value>)` call.
The bg task supervises the ctx such that when it terminates the supporting
actor runtime is also cancelled, see `_open_and_supervise_service_ctx()`
for details.
'''
cs, ipc_ctx, complete, started = await self.service_n.start(
functools.partial( functools.partial(
self._open_and_supervise_service_ctx, _open_and_supervise_service_ctx,
serman=self,
name=name, name=name,
target=target, ctx_fn=ctx_fn,
portal=portal, portal=portal,
**ctx_kwargs, **ctx_kwargs,
) )
) )
# store the cancel scope and portal for later cancellation or # store the cancel scope and portal for later cancellation or
# retstart if needed. # retstart if needed.
self.service_tasks[name] = (cs, sub_ctx, portal, complete) self.service_ctxs[name] = (cs, ipc_ctx, portal, complete)
return ( return (
cs, cs,
sub_ctx, ipc_ctx,
started, started,
) )
async def _open_and_supervise_service_ctx(
self,
name: str,
target: Callable, # TODO, type for `@tractor.context` requirement
portal: Portal,
allow_overruns: bool = False,
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
**ctx_kwargs,
) -> Any:
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() # terminate (remote) sub-actor
complete.set() # signal caller this task is done
self.service_tasks.pop(name) # remove mngr entry
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_tasks[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service actor for {name} not terminated and/or unknown?'
)
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service( async def start_service(
self, self,
daemon_name: str, daemon_name: str,
@ -493,23 +512,36 @@ class ServiceMngr:
# ^TODO, type for `@tractor.context` deco-ed funcs! # ^TODO, type for `@tractor.context` deco-ed funcs!
debug_mode: bool = False, debug_mode: bool = False,
**tractor_actor_kwargs, **start_actor_kwargs,
) -> Context: ) -> Context:
''' '''
Start a "service" task in a (new) sub-actor and manage its Start new subactor and schedule a supervising "service task"
lifetime indefinitely until termination. in it which explicitly defines the sub's lifetime.
Service actors can be cancelled (and thus shutdown) using "Service daemon subactors" are cancelled (and thus
`.cancel_service()`. terminated) using the paired `.cancel_service()`.
Effectively this API can be used to manage "service daemons"
spawned under a single parent actor with supervision
semantics equivalent to a one-cancels-one style actor-nursery
or "(subactor) task manager" where each subprocess's (and
thus its embedded actor runtime) lifetime is synced to that
of the remotely spawned task defined by `ctx_ep`.
The funcionality can be likened to a "daemonized" version of
`.hilevel.worker.run_in_actor()` but with supervision
controls offered by `tractor.Context` where the main/root
remotely scheduled `trio.Task` invoking `ctx_ep` determines
the underlying subactor's lifetime.
''' '''
entry: tuple|None = self.service_tasks.get(daemon_name) entry: tuple|None = self.service_ctxs.get(daemon_name)
if entry: if entry:
(cs, sub_ctx, portal, complete) = entry (cs, sub_ctx, portal, complete) = entry
return sub_ctx return sub_ctx
if daemon_name not in self.service_tasks: if daemon_name not in self.service_ctxs:
portal: Portal = await self.actor_n.start_actor( portal: Portal = await self.actor_n.start_actor(
daemon_name, daemon_name,
debug_mode=( # maybe set globally during allocate debug_mode=( # maybe set globally during allocate
@ -517,7 +549,7 @@ class ServiceMngr:
or or
self.debug_mode self.debug_mode
), ),
**tractor_actor_kwargs, **start_actor_kwargs,
) )
ctx_kwargs: dict[str, Any] = {} ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial): if isinstance(ctx_ep, functools.partial):
@ -531,8 +563,34 @@ class ServiceMngr:
) = await self.start_service_ctx( ) = await self.start_service_ctx(
name=daemon_name, name=daemon_name,
portal=portal, portal=portal,
target=ctx_ep, ctx_fn=ctx_ep,
**ctx_kwargs, **ctx_kwargs,
) )
return sub_ctx return sub_ctx
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_ctxs[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_ctxs:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service actor for {name} not terminated and/or unknown?'
)
# assert name not in self.service_ctxs, \
# f'Serice task for {name} not terminated?'