Use service cancel method for graceful teardown
							parent
							
								
									a3b282dffe
								
							
						
					
					
						commit
						3e72b59658
					
				| 
						 | 
					@ -35,10 +35,10 @@ log = get_logger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
_root_dname = 'pikerd'
 | 
					_root_dname = 'pikerd'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
_registry_addr = ('127.0.0.1', 6116)
 | 
					_registry_addr = ('127.0.0.1', 1616)
 | 
				
			||||||
_tractor_kwargs: dict[str, Any] = {
 | 
					_tractor_kwargs: dict[str, Any] = {
 | 
				
			||||||
    # use a different registry addr then tractor's default
 | 
					    # use a different registry addr then tractor's default
 | 
				
			||||||
    'arbiter_addr':  _registry_addr
 | 
					    'arbiter_addr': _registry_addr
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
_root_modules = [
 | 
					_root_modules = [
 | 
				
			||||||
    __name__,
 | 
					    __name__,
 | 
				
			||||||
| 
						 | 
					@ -91,14 +91,18 @@ class Services(BaseModel):
 | 
				
			||||||
                    log.info(
 | 
					                    log.info(
 | 
				
			||||||
                        f'`pikerd` service {name} started with value {first}'
 | 
					                        f'`pikerd` service {name} started with value {first}'
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                    # wait on any context's return value
 | 
					                    try:
 | 
				
			||||||
                    ctx_res = await ctx.result()
 | 
					                        # wait on any context's return value
 | 
				
			||||||
 | 
					                        ctx_res = await ctx.result()
 | 
				
			||||||
                # wait on any error from the sub-actor
 | 
					                    except tractor.ContextCancelled:
 | 
				
			||||||
                # NOTE: this will block indefinitely until cancelled
 | 
					                        return await self.cancel_service(name)
 | 
				
			||||||
                # either by error from the target context function or by
 | 
					                    else:
 | 
				
			||||||
                # being cancelled here by the surrounding cancel scope
 | 
					                        # wait on any error from the sub-actor
 | 
				
			||||||
                return (await portal.result(), ctx_res)
 | 
					                        # NOTE: this will block indefinitely until
 | 
				
			||||||
 | 
					                        # cancelled either by error from the target
 | 
				
			||||||
 | 
					                        # context function or by being cancelled here by
 | 
				
			||||||
 | 
					                        # the surrounding cancel scope
 | 
				
			||||||
 | 
					                        return (await portal.result(), ctx_res)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        cs, first = await self.service_n.start(open_context_in_task)
 | 
					        cs, first = await self.service_n.start(open_context_in_task)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -110,14 +114,17 @@ class Services(BaseModel):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: per service cancellation by scope, we aren't using this
 | 
					    # TODO: per service cancellation by scope, we aren't using this
 | 
				
			||||||
    # anywhere right?
 | 
					    # anywhere right?
 | 
				
			||||||
    # async def cancel_service(
 | 
					    async def cancel_service(
 | 
				
			||||||
    #     self,
 | 
					        self,
 | 
				
			||||||
    #     name: str,
 | 
					        name: str,
 | 
				
			||||||
    # ) -> Any:
 | 
					    ) -> Any:
 | 
				
			||||||
    #     log.info(f'Cancelling `pikerd` service {name}')
 | 
					        log.info(f'Cancelling `pikerd` service {name}')
 | 
				
			||||||
    #     cs, portal = self.service_tasks[name]
 | 
					        cs, portal = self.service_tasks[name]
 | 
				
			||||||
    #     cs.cancel()
 | 
					        # XXX: not entirely sure why this is required,
 | 
				
			||||||
    #     return await portal.cancel_actor()
 | 
					        # and should probably be better fine tuned in
 | 
				
			||||||
 | 
					        # ``tractor``?
 | 
				
			||||||
 | 
					        cs.cancel()
 | 
				
			||||||
 | 
					        return await portal.cancel_actor()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
_services: Optional[Services] = None
 | 
					_services: Optional[Services] = None
 | 
				
			||||||
| 
						 | 
					@ -395,6 +402,7 @@ async def maybe_spawn_daemon(
 | 
				
			||||||
        async with tractor.wait_for_actor(service_name) as portal:
 | 
					        async with tractor.wait_for_actor(service_name) as portal:
 | 
				
			||||||
            lock.release()
 | 
					            lock.release()
 | 
				
			||||||
            yield portal
 | 
					            yield portal
 | 
				
			||||||
 | 
					            await portal.cancel_actor()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def spawn_brokerd(
 | 
					async def spawn_brokerd(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue