{ "name": "/user.slice" }]

时间:2022-01-01 09:08:36

标签:

一、heapster简介 什么是Heapster?

Heapster是容器集群监控和性能分析工具,天然的撑持Kubernetes和CoreOS。
Kubernetes有个着名的监控agent---cAdvisor。在每个kubernetes Node上城市运行cAdvisor,它会收集本机以及容器的监控数据(cpu,memory,filesystem,network,uptime)。
在较新的版本中,K8S已经将cAdvisor成果集成到kubelet组件中。每个Node节点可以直接进行web访谒。

cAdvisor web界面访谒: < Node-IP >:4194
cAdvisor也供给Restful API: https://github.com/google/cadv ... pi.md

Heapster是一个收集者,将每个Node上的cAdvisor的数据进行汇总,然后导到第三方工具(如InfluxDB)。

{ "name": "/user.slice" }]

二、heapster挪用kubelet源码分析 1、整体源码

heapster整体的源码分析可以参考文章:,

上面链接中的文章中会对heapster从启动开始进行分析,主要讲述了下面内容:

main()

创建数据源

创建后端处事

创建数据processors

获取源数据并存储

heapster API创建

给上述文章的作者点个赞,思路真的非常清晰。不过笔者最存眷的点是heapster如何进行的kubelet挪用。

2、挪用kubelet源码分析

从上述文章提到的 NewKubeletProvider 函数中,我们找到了创建kubeClient和kubeletClient的处所,见下面代码的飘黄部分:

func NewKubeletProvider(uri *url.URL) (MetricsSourceProvider, error) { // 创建kubernetes master及kubelet client相关的配置 kubeConfig, kubeletConfig, err := GetKubeConfigs(uri) if err != nil { return nil, err } // 创建kubeClient及kubeletClient kubeClient := kube_client.NewOrDie(kubeConfig) kubeletClient, err := NewKubeletClient(kubeletConfig) if err != nil { return nil, err } // 获取下所有的Nodes,测试下创建的client是否能正常通讯 if _, err := kubeClient.Nodes().List(kube_api.ListOptions{ LabelSelector: labels.Everything(), FieldSelector: fields.Everything()}); err != nil { glog.Errorf("Failed to load nodes: %v", err) } // 监控k8s的nodes变换 // 这里会创建协程进行watch,便于后面挪用nodeLister.List()列出所有的nodes。 // 该Watch的实现,需要看下apiServer中的实现,后面会进行讲解 lw := cache.NewListWatchFromClient(kubeClient, "nodes", kube_api.NamespaceAll, fields.Everything()) nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} reflector := cache.NewReflector(lw, &kube_api.Node{}, nodeLister.Store, time.Hour) reflector.Run() // 布局在前面介绍过 return &kubeletProvider{ nodeLister: nodeLister, reflector: reflector, kubeletClient: kubeletClient, }, nil }

1)heapster源码目录

我们发明这个函数地址的文件为kubelet.go,地址的包为metrics/sources/kubelet

我们可以看到,kubelet挪用相关的代码都在这个里面:

{ "name": "/user.slice" }]

2)heapster挪用kubelet的API

我们进入到NewKubeletClient函数看一下:

// 传入的是KubeletClientConfig,通过读取相关配置,来初始化client func NewKubeletClient(kubeletConfig *kubelet_client.KubeletClientConfig) (*KubeletClient, error) { transport, err := kubelet_client.MakeTransport(kubeletConfig) if err != nil { return nil, err } c := &http.Client{ // 此处可以看到,是http挪用 Transport: transport, Timeout: kubeletConfig.HTTPTimeout, } return &KubeletClient{ config: kubeletConfig, client: c, }, nil }

接下来我们看一下kubelet_client.go中KubeletClient有哪些要领:

(i)获取所有的containersInfo

第一个,获取所有的container统计信息,如果对cAdvisor对照了解的话,,可以进入到该要领的返回值的ContainerInfo里面看下,长短常全面的容器监控信息(后续会有cAdvisor的源码分析,敬请等候)

func (self *KubeletClient) getAllContainers(url string, start, end time.Time) ([]cadvisor.ContainerInfo, error) { // Request data from all subcontainers.此处是结构requestBody request := statsRequest{ ContainerName: "http://www.mamicode.com/", NumStats: 1, Start: start, // 2017-11-10T06:46:17Z 这种utc格局的时间戳 End: end, Subcontainers: true, } body, err := json.Marshal(request) if err != nil { return nil, err } req, err := http.NewRequest("POST", url, bytes.NewBuffer(body)) if err != nil { return nil, err }
// 设置请求头 req.Header.Set(
"Content-Type", "application/json") var containers map[string]cadvisor.ContainerInfo client := self.client if client == nil { client = http.DefaultClient } err = self.postRequestAndGetValue(client, req, &containers) if err != nil { return nil, fmt.Errorf("failed to get all container stats from Kubelet URL %q: %v", url, err) } result := make([]cadvisor.ContainerInfo, 0, len(containers)) for _, containerInfo := range containers { cont := self.parseStat(&containerInfo) if cont != nil { result = append(result, *cont) } } return result, nil }

我们可以看到上面的源码其实是结构了一个http的post请求。那么我们在本地通过restClient模拟一下: