Use service cancel method for graceful teardown
parent
acba4e8f02
commit
6d1a3dfdc5
|
@ -35,10 +35,10 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
_root_dname = 'pikerd'
|
_root_dname = 'pikerd'
|
||||||
|
|
||||||
_registry_addr = ('127.0.0.1', 6116)
|
_registry_addr = ('127.0.0.1', 1616)
|
||||||
_tractor_kwargs: dict[str, Any] = {
|
_tractor_kwargs: dict[str, Any] = {
|
||||||
# use a different registry addr then tractor's default
|
# use a different registry addr then tractor's default
|
||||||
'arbiter_addr': _registry_addr
|
'arbiter_addr': _registry_addr
|
||||||
}
|
}
|
||||||
_root_modules = [
|
_root_modules = [
|
||||||
__name__,
|
__name__,
|
||||||
|
@ -91,14 +91,18 @@ class Services(BaseModel):
|
||||||
log.info(
|
log.info(
|
||||||
f'`pikerd` service {name} started with value {first}'
|
f'`pikerd` service {name} started with value {first}'
|
||||||
)
|
)
|
||||||
# wait on any context's return value
|
try:
|
||||||
ctx_res = await ctx.result()
|
# wait on any context's return value
|
||||||
|
ctx_res = await ctx.result()
|
||||||
# wait on any error from the sub-actor
|
except tractor.ContextCancelled:
|
||||||
# NOTE: this will block indefinitely until cancelled
|
return await self.cancel_service(name)
|
||||||
# either by error from the target context function or by
|
else:
|
||||||
# being cancelled here by the surrounding cancel scope
|
# wait on any error from the sub-actor
|
||||||
return (await portal.result(), ctx_res)
|
# NOTE: this will block indefinitely until
|
||||||
|
# cancelled either by error from the target
|
||||||
|
# context function or by being cancelled here by
|
||||||
|
# the surrounding cancel scope
|
||||||
|
return (await portal.result(), ctx_res)
|
||||||
|
|
||||||
cs, first = await self.service_n.start(open_context_in_task)
|
cs, first = await self.service_n.start(open_context_in_task)
|
||||||
|
|
||||||
|
@ -110,14 +114,17 @@ class Services(BaseModel):
|
||||||
|
|
||||||
# TODO: per service cancellation by scope, we aren't using this
|
# TODO: per service cancellation by scope, we aren't using this
|
||||||
# anywhere right?
|
# anywhere right?
|
||||||
# async def cancel_service(
|
async def cancel_service(
|
||||||
# self,
|
self,
|
||||||
# name: str,
|
name: str,
|
||||||
# ) -> Any:
|
) -> Any:
|
||||||
# log.info(f'Cancelling `pikerd` service {name}')
|
log.info(f'Cancelling `pikerd` service {name}')
|
||||||
# cs, portal = self.service_tasks[name]
|
cs, portal = self.service_tasks[name]
|
||||||
# cs.cancel()
|
# XXX: not entirely sure why this is required,
|
||||||
# return await portal.cancel_actor()
|
# and should probably be better fine tuned in
|
||||||
|
# ``tractor``?
|
||||||
|
cs.cancel()
|
||||||
|
return await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
_services: Optional[Services] = None
|
_services: Optional[Services] = None
|
||||||
|
@ -372,6 +379,7 @@ async def maybe_spawn_daemon(
|
||||||
async with tractor.wait_for_actor(service_name) as portal:
|
async with tractor.wait_for_actor(service_name) as portal:
|
||||||
lock.release()
|
lock.release()
|
||||||
yield portal
|
yield portal
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
async def spawn_brokerd(
|
async def spawn_brokerd(
|
||||||
|
|
Loading…
Reference in New Issue