Use service cancel method for graceful teardown

big_data_lines
Tyler Goodlet 2022-03-16 09:54:52 -04:00
parent 03a08b5f63
commit bedb55b79d
1 changed files with 26 additions and 18 deletions

View File

@ -35,10 +35,10 @@ log = get_logger(__name__)
_root_dname = 'pikerd' _root_dname = 'pikerd'
_registry_addr = ('127.0.0.1', 6116) _registry_addr = ('127.0.0.1', 1616)
_tractor_kwargs: dict[str, Any] = { _tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default # use a different registry addr then tractor's default
'arbiter_addr': _registry_addr 'arbiter_addr': _registry_addr
} }
_root_modules = [ _root_modules = [
__name__, __name__,
@ -91,14 +91,18 @@ class Services(BaseModel):
log.info( log.info(
f'`pikerd` service {name} started with value {first}' f'`pikerd` service {name} started with value {first}'
) )
# wait on any context's return value try:
ctx_res = await ctx.result() # wait on any context's return value
ctx_res = await ctx.result()
# wait on any error from the sub-actor except tractor.ContextCancelled:
# NOTE: this will block indefinitely until cancelled return await self.cancel_service(name)
# either by error from the target context function or by else:
# being cancelled here by the surrounding cancel scope # wait on any error from the sub-actor
return (await portal.result(), ctx_res) # NOTE: this will block indefinitely until
# cancelled either by error from the target
# context function or by being cancelled here by
# the surrounding cancel scope
return (await portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task) cs, first = await self.service_n.start(open_context_in_task)
@ -110,14 +114,17 @@ class Services(BaseModel):
# TODO: per service cancellation by scope, we aren't using this # TODO: per service cancellation by scope, we aren't using this
# anywhere right? # anywhere right?
# async def cancel_service( async def cancel_service(
# self, self,
# name: str, name: str,
# ) -> Any: ) -> Any:
# log.info(f'Cancelling `pikerd` service {name}') log.info(f'Cancelling `pikerd` service {name}')
# cs, portal = self.service_tasks[name] cs, portal = self.service_tasks[name]
# cs.cancel() # XXX: not entirely sure why this is required,
# return await portal.cancel_actor() # and should probably be better fine tuned in
# ``tractor``?
cs.cancel()
return await portal.cancel_actor()
_services: Optional[Services] = None _services: Optional[Services] = None
@ -372,6 +379,7 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal: async with tractor.wait_for_actor(service_name) as portal:
lock.release() lock.release()
yield portal yield portal
await portal.cancel_actor()
async def spawn_brokerd( async def spawn_brokerd(