Kubernetes开发(3)-如何感知资源的实时变化
发布日期:2021-06-29 11:37:47 浏览次数:2 分类:技术文章

本文共 4737 字,大约阅读时间需要 15 分钟。

ClientSet的基本使用

日常运维一般使用kubectl进行资源的查看,创建,销毁等,对接一些自动化运维平台的话可能需要通过一些其他方法,k8s官方提供的方式就是通过clientset进行操作,举个Goclient的例子, 当然其他语言也有类似的sdk封装,为了贴合源码,主要还是学习go的:

package mainimport (	"flag"	"fmt"	"os"	"path/filepath"	"context"	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"	"k8s.io/client-go/kubernetes"	"k8s.io/client-go/rest"	"k8s.io/client-go/tools/clientcmd")func main() {	var err error	var config *rest.Config	var kubeconfig *string	if home := homeDir(); home != "" {		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")	} else {		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")	}	flag.Parse()	// 使用 ServiceAccount 创建集群配置(InCluster模式)	if config, err = rest.InClusterConfig(); err != nil {		// 使用 KubeConfig 文件创建集群配置		if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {			panic(err.Error())		}	}	// 创建 clientset	clientset, err := kubernetes.NewForConfig(config)	if err != nil {		panic(err.Error())	}	// 使用 clientsent 获取 Deployments	deployments, err := clientset.AppsV1().Deployments("kube-system").List(context.TODO(), metav1.ListOptions{})	if err != nil {		panic(err)	}	for idx, deploy := range deployments.Items {		fmt.Printf("%d -> %s\n", idx+1, deploy.Name)	}}func homeDir() string {	if h := os.Getenv("HOME"); h != "" {		return h	}	return os.Getenv("USERPROFILE") // windows}

上述例子就是一个简单的show所有deployment的例子,逻辑也很简单,基本逻辑就是加载配置文件 – 生成clientset对象 – 对resource 进行操作 ,具体针对不同资源的处理逻辑在代码staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go

Informer

简单的操作k8s直接调用clientcmd等方法即可,即插即用,非常简单,但一个控制器在长期运行中,不可能写个死循环不停的通过clientset拼命调apiserver获取资源状态,这样对apiserver的压力可想而知,所以Kubernetes通过了一个Informer机制来减少apiserver压力的同时,也可以确保及时的获取到对应的资源信息,这个Informer机制如果用两个字来说,就是"缓存"。

  • 之前有笔记提到过Informer,可以参考下:

先说下informer实现的效果,客户端通过informer获取资源的状态,而不再直接访问apiserver, informer本地会提供一个Lister接口,用于用户端的调用,且informer可以及时的知道资源的状态变化,确保数据一定是和apiserver一致的。

基于实现效果再来看informer的原理,informer在初始化的时候,会首次从apiserver里全量获取需要的资源信息,缓存到本地,然后定时进行全量数据的拉取以及缓存,同时机遇http长链接的机制,和apiserver保持长链接,当资源发生变化的时候,apiserver会通过该长链接通知到informer,informer修改本地缓存的数据。比较有意思的是,informer从apiserver获取的数据是以事件形式获取的,而不是单纯的状态数据,这点后面会仔细看对应的结构体。

如果需要自己写一个informer来观察资源的状态,可以参考以下代码:

package mainimport (	"flag"	"fmt"	"path/filepath"	"time"	v1 "k8s.io/api/apps/v1"	"k8s.io/apimachinery/pkg/labels"	"k8s.io/client-go/informers"	"k8s.io/client-go/kubernetes"	"k8s.io/client-go/rest"	"k8s.io/client-go/tools/cache"	"k8s.io/client-go/tools/clientcmd"	"k8s.io/client-go/util/homedir")func main() {	var err error	var config *rest.Config	var kubeconfig *string	if home := homedir.HomeDir(); home != "" {		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径")	} else {		kubeconfig = flag.String("kubeconfig", "", "kubeconfig 绝对路径")	}	// 初始化 rest.Config 对象	if config, err = rest.InClusterConfig(); err != nil {		if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {			panic(err.Error())		}	}	// 创建 Clientset 对象	clientset, err := kubernetes.NewForConfig(config)	if err != nil {		panic(err.Error())	}	// 初始化 informer factory(为了测试方便这里设置每30s重新 List 一次)	informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)	// 对 Deployment 监听	deployInformer := informerFactory.Apps().V1().Deployments()	// 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源)	informer := deployInformer.Informer()	// 创建 Lister	deployLister := deployInformer.Lister()	// 注册事件处理程序	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{		AddFunc:    onAdd,		UpdateFunc: onUpdate,		DeleteFunc: onDelete,	})	stopper := make(chan struct{})	defer close(stopper)	// 启动 informer,List & Watch	informerFactory.Start(stopper)	// 等待所有启动的 Informer 的缓存被同步	informerFactory.WaitForCacheSync(stopper)	// 从本地缓存中获取 default 中的所有 deployment 列表	deployments, err := deployLister.Deployments("default").List(labels.Everything())	if err != nil {		panic(err)	}	for idx, deploy := range deployments {		fmt.Printf("%d -> %s\n", idx+1, deploy.Name)	}	<-stopper}func onAdd(obj interface{}) {	deploy := obj.(*v1.Deployment)	fmt.Println("add a deployment:", deploy.Name)}func onUpdate(old, new interface{}) {	oldDeploy := old.(*v1.Deployment)	newDeploy := new.(*v1.Deployment)	fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name)}func onDelete(obj interface{}) {	deploy := obj.(*v1.Deployment)	fmt.Println("delete a deployment:", deploy.Name)}

大致的代码逻辑:

  • 初始化一个informer工厂
  • 创建一类资源的监听对象,
  • 将该监听对象注册到informer工厂里,并生成该资源的informer对象
  • 给informer对象绑定需要的回调函数(删除,添加,更新等)
  • 启动informer并在第一次拉取全量数据到本地缓存
  • 开启Lister接口,让用户端可以调用

当然informer本身其实也是由多个组件实现的,这块后续分析,个人觉得informer这个缓存机制还是非常优秀的,舍弃了中间件,单纯通过http的方式将事件缓存到本地,用在自己的代码框架里也是不错的选择。

个人公众号, 分享一些日常开发,运维工作中的日常以及一些学习感悟,欢迎大家互相学习,交流

在这里插入图片描述

转载地址:https://blog.csdn.net/zyxpaomian/article/details/116798925 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Kubernetes开发(4)-webhook 实现拦截请求
下一篇:Kubernetes开发(2)—API 资源对象的串联过程

发表评论

最新留言

初次前来,多多关照!
[***.217.46.12]2024年04月18日 13时41分11秒