本文共 4737 字,大约阅读时间需要 15 分钟。
ClientSet的基本使用
日常运维一般使用kubectl进行资源的查看,创建,销毁等,对接一些自动化运维平台的话可能需要通过一些其他方法,k8s官方提供的方式就是通过clientset进行操作,举个Goclient的例子, 当然其他语言也有类似的sdk封装,为了贴合源码,主要还是学习go的:
package mainimport ( "flag" "fmt" "os" "path/filepath" "context" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd")func main() { var err error var config *rest.Config var kubeconfig *string if home := homeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse() // 使用 ServiceAccount 创建集群配置(InCluster模式) if config, err = rest.InClusterConfig(); err != nil { // 使用 KubeConfig 文件创建集群配置 if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } } // 创建 clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // 使用 clientsent 获取 Deployments deployments, err := clientset.AppsV1().Deployments("kube-system").List(context.TODO(), metav1.ListOptions{}) if err != nil { panic(err) } for idx, deploy := range deployments.Items { fmt.Printf("%d -> %s\n", idx+1, deploy.Name) }}func homeDir() string { if h := os.Getenv("HOME"); h != "" { return h } return os.Getenv("USERPROFILE") // windows}
上述例子就是一个简单的show所有deployment的例子,逻辑也很简单,基本逻辑就是加载配置文件 – 生成clientset对象 – 对resource 进行操作 ,具体针对不同资源的处理逻辑在代码staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go里
Informer
简单的操作k8s直接调用clientcmd等方法即可,即插即用,非常简单,但一个控制器在长期运行中,不可能写个死循环不停的通过clientset拼命调apiserver获取资源状态,这样对apiserver的压力可想而知,所以Kubernetes通过了一个Informer机制来减少apiserver压力的同时,也可以确保及时的获取到对应的资源信息,这个Informer机制如果用两个字来说,就是"缓存"。
- 之前有笔记提到过Informer,可以参考下:
先说下informer实现的效果,客户端通过informer获取资源的状态,而不再直接访问apiserver, informer本地会提供一个Lister接口,用于用户端的调用,且informer可以及时的知道资源的状态变化,确保数据一定是和apiserver一致的。
基于实现效果再来看informer的原理,informer在初始化的时候,会首次从apiserver里全量获取需要的资源信息,缓存到本地,然后定时进行全量数据的拉取以及缓存,同时机遇http长链接的机制,和apiserver保持长链接,当资源发生变化的时候,apiserver会通过该长链接通知到informer,informer修改本地缓存的数据。比较有意思的是,informer从apiserver获取的数据是以事件形式获取的,而不是单纯的状态数据,这点后面会仔细看对应的结构体。如果需要自己写一个informer来观察资源的状态,可以参考以下代码:
package mainimport ( "flag" "fmt" "path/filepath" "time" v1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir")func main() { var err error var config *rest.Config var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径") } else { kubeconfig = flag.String("kubeconfig", "", "kubeconfig 绝对路径") } // 初始化 rest.Config 对象 if config, err = rest.InClusterConfig(); err != nil { if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } } // 创建 Clientset 对象 clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // 初始化 informer factory(为了测试方便这里设置每30s重新 List 一次) informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30) // 对 Deployment 监听 deployInformer := informerFactory.Apps().V1().Deployments() // 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源) informer := deployInformer.Informer() // 创建 Lister deployLister := deployInformer.Lister() // 注册事件处理程序 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: onAdd, UpdateFunc: onUpdate, DeleteFunc: onDelete, }) stopper := make(chan struct{}) defer close(stopper) // 启动 informer,List & Watch informerFactory.Start(stopper) // 等待所有启动的 Informer 的缓存被同步 informerFactory.WaitForCacheSync(stopper) // 从本地缓存中获取 default 中的所有 deployment 列表 deployments, err := deployLister.Deployments("default").List(labels.Everything()) if err != nil { panic(err) } for idx, deploy := range deployments { fmt.Printf("%d -> %s\n", idx+1, deploy.Name) } <-stopper}func onAdd(obj interface{}) { deploy := obj.(*v1.Deployment) fmt.Println("add a deployment:", deploy.Name)}func onUpdate(old, new interface{}) { oldDeploy := old.(*v1.Deployment) newDeploy := new.(*v1.Deployment) fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name)}func onDelete(obj interface{}) { deploy := obj.(*v1.Deployment) fmt.Println("delete a deployment:", deploy.Name)}
大致的代码逻辑:
- 初始化一个informer工厂
- 创建一类资源的监听对象,
- 将该监听对象注册到informer工厂里,并生成该资源的informer对象
- 给informer对象绑定需要的回调函数(删除,添加,更新等)
- 启动informer并在第一次拉取全量数据到本地缓存
- 开启Lister接口,让用户端可以调用
当然informer本身其实也是由多个组件实现的,这块后续分析,个人觉得informer这个缓存机制还是非常优秀的,舍弃了中间件,单纯通过http的方式将事件缓存到本地,用在自己的代码框架里也是不错的选择。
个人公众号, 分享一些日常开发,运维工作中的日常以及一些学习感悟,欢迎大家互相学习,交流
转载地址:https://blog.csdn.net/zyxpaomian/article/details/116798925 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!