k8s源码分析-----kubelet启动流程分析

时间:2021-08-24 16:48:24

源码为k8s v1.3.0稳定版本

一、代码结构

(1) 代码起始位置

 kubernetes\cmd\kubelet\kubelet.go

k8s源码分析-----kubelet启动流程分析


(2) 业务逻辑代码

k8s源码分析-----kubelet启动流程分析


二 、初始化启动过程

(1) main函数的处理

main函数的过程,基本属于k8s的统一风格,代码位于 kubernetes\cmd\kubelet\kubelet.go中

k8s源码分析-----kubelet启动流程分析

关注一下NewKubeletServer

// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {
glog.Errorf("NewKubeletServer is called")
config := componentconfig.KubeletConfiguration{}
api.Scheme.Convert(&v1alpha1.KubeletConfiguration{}, &config)
return &KubeletServer{
AuthPath: util.NewStringFlag("/var/lib/kubelet/kubernetes_auth"), // deprecated
KubeConfig: util.NewStringFlag("/var/lib/kubelet/kubeconfig"),
KubeletConfiguration: config,
}
}

// KubeletServer encapsulates all of the parameters necessary for starting up
// a kubelet. These can either be set via command line or directly.
type KubeletServer struct {
componentconfig.KubeletConfiguration

AuthPath util.StringFlag // Deprecated -- use KubeConfig instead
KubeConfig util.StringFlag
APIServerList []string

RunOnce bool

// Insert a probability of random errors during calls to the master.
ChaosChance float64
// Crash immediately, rather than eating panics.
ReallyCrashForTesting bool
}

k8s源码分析-----kubelet启动流程分析

KubeletConfiguration的定义:

type KubeletConfiguration struct {
unversioned.TypeMeta

// config is the path to the config file or directory of files
Config string `json:"config"`
// syncFrequency is the max period between synchronizing running
// containers and config
SyncFrequency unversioned.Duration `json:"syncFrequency"`
// fileCheckFrequency is the duration between checking config files for
// new data
FileCheckFrequency unversioned.Duration `json:"fileCheckFrequency"`
// httpCheckFrequency is the duration between checking http for new data
HTTPCheckFrequency unversioned.Duration `json:"httpCheckFrequency"`
// manifestURL is the URL for accessing the container manifest
ManifestURL string `json:"manifestURL"`
// manifestURLHeader is the HTTP header to use when accessing the manifest
// URL, with the key separated from the value with a ':', as in 'key:value'
ManifestURLHeader string `json:"manifestURLHeader"`
// enableServer enables the Kubelet's server
EnableServer bool `json:"enableServer"`
// address is the IP address for the Kubelet to serve on (set to 0.0.0.0
// for all interfaces)
Address string `json:"address"`
// port is the port for the Kubelet to serve on.
Port int32 `json:"port"`
// readOnlyPort is the read-only port for the Kubelet to serve on with
// no authentication/authorization (set to 0 to disable)
ReadOnlyPort int32 `json:"readOnlyPort"`
// tlsCertFile is the file containing x509 Certificate for HTTPS. (CA cert,
// if any, concatenated after server cert). If tlsCertFile and
// tlsPrivateKeyFile are not provided, a self-signed certificate
// and key are generated for the public address and saved to the directory
// passed to certDir.
TLSCertFile string `json:"tlsCertFile"`
// tlsPrivateKeyFile is the ile containing x509 private key matching
// tlsCertFile.
TLSPrivateKeyFile string `json:"tlsPrivateKeyFile"`
// certDirectory is the directory where the TLS certs are located (by
// default /var/run/kubernetes). If tlsCertFile and tlsPrivateKeyFile
// are provided, this flag will be ignored.
CertDirectory string `json:"certDirectory"`
// hostnameOverride is the hostname used to identify the kubelet instead
// of the actual hostname.
HostnameOverride string `json:"hostnameOverride"`
// podInfraContainerImage is the image whose network/ipc namespaces
// containers in each pod will use.
PodInfraContainerImage string `json:"podInfraContainerImage"`
// dockerEndpoint is the path to the docker endpoint to communicate with.
DockerEndpoint string `json:"dockerEndpoint"`
// rootDirectory is the directory path to place kubelet files (volume
// mounts,etc).
RootDirectory string `json:"rootDirectory"`
// seccompProfileRoot is the directory path for seccomp profiles.
SeccompProfileRoot string `json:"seccompProfileRoot"`
// allowPrivileged enables containers to request privileged mode.
// Defaults to false.
AllowPrivileged bool `json:"allowPrivileged"`
// hostNetworkSources is a comma-separated list of sources from which the
// Kubelet allows pods to use of host network. Defaults to "*". Valid
// options are "file", "http", "api", and "*" (all sources).
HostNetworkSources []string `json:"hostNetworkSources"`
// hostPIDSources is a comma-separated list of sources from which the
// Kubelet allows pods to use the host pid namespace. Defaults to "*".
HostPIDSources []string `json:"hostPIDSources"`
// hostIPCSources is a comma-separated list of sources from which the
// Kubelet allows pods to use the host ipc namespace. Defaults to "*".
HostIPCSources []string `json:"hostIPCSources"`
// registryPullQPS is the limit of registry pulls per second. If 0,
// unlimited. Set to 0 for no limit. Defaults to 5.0.
RegistryPullQPS int32 `json:"registryPullQPS"`
// registryBurst is the maximum size of a bursty pulls, temporarily allows
// pulls to burst to this number, while still not exceeding registryQps.
// Only used if registryQPS > 0.
RegistryBurst int32 `json:"registryBurst"`
// eventRecordQPS is the maximum event creations per second. If 0, there
// is no limit enforced.
EventRecordQPS int32 `json:"eventRecordQPS"`
// eventBurst is the maximum size of a bursty event records, temporarily
// allows event records to burst to this number, while still not exceeding
// event-qps. Only used if eventQps > 0
EventBurst int32 `json:"eventBurst"`
// enableDebuggingHandlers enables server endpoints for log collection
// and local running of containers and commands
EnableDebuggingHandlers bool `json:"enableDebuggingHandlers"`
// minimumGCAge is the minimum age for a finished container before it is
// garbage collected.
MinimumGCAge unversioned.Duration `json:"minimumGCAge"`
// maxPerPodContainerCount is the maximum number of old instances to
// retain per container. Each container takes up some disk space.
MaxPerPodContainerCount int32 `json:"maxPerPodContainerCount"`
// maxContainerCount is the maximum number of old instances of containers
// to retain globally. Each container takes up some disk space.
MaxContainerCount int32 `json:"maxContainerCount"`
// cAdvisorPort is the port of the localhost cAdvisor endpoint
CAdvisorPort int32 `json:"cAdvisorPort"`
// healthzPort is the port of the localhost healthz endpoint
HealthzPort int32 `json:"healthzPort"`
// healthzBindAddress is the IP address for the healthz server to serve
// on.
HealthzBindAddress string `json:"healthzBindAddress"`
// oomScoreAdj is The oom-score-adj value for kubelet process. Values
// must be within the range [-1000, 1000].
OOMScoreAdj int32 `json:"oomScoreAdj"`
// registerNode enables automatic registration with the apiserver.
RegisterNode bool `json:"registerNode"`
// clusterDomain is the DNS domain for this cluster. If set, kubelet will
// configure all containers to search this domain in addition to the
// host's search domains.
ClusterDomain string `json:"clusterDomain"`
// masterServiceNamespace is The namespace from which the kubernetes
// master services should be injected into pods.
MasterServiceNamespace string `json:"masterServiceNamespace"`
// clusterDNS is the IP address for a cluster DNS server. If set, kubelet
// will configure all containers to use this for DNS resolution in
// addition to the host's DNS servers
ClusterDNS string `json:"clusterDNS"`
// streamingConnectionIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
StreamingConnectionIdleTimeout unversioned.Duration `json:"streamingConnectionIdleTimeout"`
// nodeStatusUpdateFrequency is the frequency that kubelet posts node
// status to master. Note: be cautious when changing the constant, it
// must work with nodeMonitorGracePeriod in nodecontroller.
NodeStatusUpdateFrequency unversioned.Duration `json:"nodeStatusUpdateFrequency"`
// imageMinimumGCAge is the minimum age for a unused image before it is
// garbage collected.
ImageMinimumGCAge unversioned.Duration `json:"imageMinimumGCAge"`
// imageGCHighThresholdPercent is the percent of disk usage after which
// image garbage collection is always run.
ImageGCHighThresholdPercent int32 `json:"imageGCHighThresholdPercent"`
// imageGCLowThresholdPercent is the percent of disk usage before which
// image garbage collection is never run. Lowest disk usage to garbage
// collect to.
ImageGCLowThresholdPercent int32 `json:"imageGCLowThresholdPercent"`
// lowDiskSpaceThresholdMB is the absolute free disk space, in MB, to
// maintain. When disk space falls below this threshold, new pods would
// be rejected.
LowDiskSpaceThresholdMB int32 `json:"lowDiskSpaceThresholdMB"`
// How frequently to calculate and cache volume disk usage for all pods
VolumeStatsAggPeriod unversioned.Duration `json:"volumeStatsAggPeriod"`
// networkPluginName is the name of the network plugin to be invoked for
// various events in kubelet/pod lifecycle
NetworkPluginName string `json:"networkPluginName"`
// networkPluginDir is the full path of the directory in which to search
// for network plugins
NetworkPluginDir string `json:"networkPluginDir"`
// volumePluginDir is the full path of the directory in which to search
// for additional third party volume plugins
VolumePluginDir string `json:"volumePluginDir"`
// cloudProvider is the provider for cloud services.
CloudProvider string `json:"cloudProvider,omitempty"`
// cloudConfigFile is the path to the cloud provider configuration file.
CloudConfigFile string `json:"cloudConfigFile,omitempty"`
// KubeletCgroups is the absolute name of cgroups to isolate the kubelet in.
KubeletCgroups string `json:"kubeletCgroups,omitempty"`
// Enable QoS based Cgroup hierarchy: top level cgroups for QoS Classes
// And all Burstable and BestEffort pods are brought up under their
// specific top level QoS cgroup.
CgroupsPerQOS bool `json:"CgroupsPerQOS,omitempty"`
// Cgroups that container runtime is expected to be isolated in.
RuntimeCgroups string `json:"runtimeCgroups,omitempty"`
// SystemCgroups is absolute name of cgroups in which to place
// all non-kernel processes that are not already in a container. Empty
// for no container. Rolling back the flag requires a reboot.
SystemCgroups string `json:"systemCgroups,omitempty"`
// CgroupRoot is the root cgroup to use for pods.
// If CgroupsPerQOS is enabled, this is the root of the QoS cgroup hierarchy.
CgroupRoot string `json:"cgroupRoot,omitempty"`
// containerRuntime is the container runtime to use.
ContainerRuntime string `json:"containerRuntime"`
// runtimeRequestTimeout is the timeout for all runtime requests except long running
// requests - pull, logs, exec and attach.
RuntimeRequestTimeout unversioned.Duration `json:"runtimeRequestTimeout,omitempty"`
// rktPath is the path of rkt binary. Leave empty to use the first rkt in
// $PATH.
RktPath string `json:"rktPath,omitempty"`
// rktApiEndpoint is the endpoint of the rkt API service to communicate with.
RktAPIEndpoint string `json:"rktAPIEndpoint,omitempty"`
// rktStage1Image is the image to use as stage1. Local paths and
// http/https URLs are supported.
RktStage1Image string `json:"rktStage1Image,omitempty"`
// lockFilePath is the path that kubelet will use to as a lock file.
// It uses this file as a lock to synchronize with other kubelet processes
// that may be running.
LockFilePath string `json:"lockFilePath"`
// ExitOnLockContention is a flag that signifies to the kubelet that it is running
// in "bootstrap" mode. This requires that 'LockFilePath' has been set.
// This will cause the kubelet to listen to inotify events on the lock file,
// releasing it and exiting when another process tries to open that file.
ExitOnLockContention bool `json:"exitOnLockContention"`
// configureCBR0 enables the kublet to configure cbr0 based on
// Node.Spec.PodCIDR.
ConfigureCBR0 bool `json:"configureCbr0"`
// How should the kubelet configure the container bridge for hairpin packets.
// Setting this flag allows endpoints in a Service to loadbalance back to
// themselves if they should try to access their own Service. Values:
// "promiscuous-bridge": make the container bridge promiscuous.
// "hairpin-veth": set the hairpin flag on container veth interfaces.
// "none": do nothing.
// Setting --configure-cbr0 to false implies that to achieve hairpin NAT
// one must set --hairpin-mode=veth-flag, because bridge assumes the
// existence of a container bridge named cbr0.
HairpinMode string `json:"hairpinMode"`
// The node has babysitter process monitoring docker and kubelet.
BabysitDaemons bool `json:"babysitDaemons"`
// maxPods is the number of pods that can run on this Kubelet.
MaxPods int32 `json:"maxPods"`
// nvidiaGPUs is the number of NVIDIA GPU devices on this node.
NvidiaGPUs int32 `json:"nvidiaGPUs"`
// dockerExecHandlerName is the handler to use when executing a command
// in a container. Valid values are 'native' and 'nsenter'. Defaults to
// 'native'.
DockerExecHandlerName string `json:"dockerExecHandlerName"`
// The CIDR to use for pod IP addresses, only used in standalone mode.
// In cluster mode, this is obtained from the master.
PodCIDR string `json:"podCIDR"`
// ResolverConfig is the resolver configuration file used as the basis
// for the container DNS resolution configuration."), []
ResolverConfig string `json:"resolvConf"`
// cpuCFSQuota is Enable CPU CFS quota enforcement for containers that
// specify CPU limits
CPUCFSQuota bool `json:"cpuCFSQuota"`
// containerized should be set to true if kubelet is running in a container.
Containerized bool `json:"containerized"`
// maxOpenFiles is Number of files that can be opened by Kubelet process.
MaxOpenFiles int64 `json:"maxOpenFiles"`
// reconcileCIDR is Reconcile node CIDR with the CIDR specified by the
// API server. No-op if register-node or configure-cbr0 is false.
ReconcileCIDR bool `json:"reconcileCIDR"`
// registerSchedulable tells the kubelet to register the node as
// schedulable. No-op if register-node is false.
RegisterSchedulable bool `json:"registerSchedulable"`
// contentType is contentType of requests sent to apiserver.
ContentType string `json:"contentType"`
// kubeAPIQPS is the QPS to use while talking with kubernetes apiserver
KubeAPIQPS int32 `json:"kubeAPIQPS"`
// kubeAPIBurst is the burst to allow while talking with kubernetes
// apiserver
KubeAPIBurst int32 `json:"kubeAPIBurst"`
// serializeImagePulls when enabled, tells the Kubelet to pull images one
// at a time. We recommend *not* changing the default value on nodes that
// run docker daemon with version < 1.9 or an Aufs storage backend.
// Issue #10959 has more details.
SerializeImagePulls bool `json:"serializeImagePulls"`
// experimentalFlannelOverlay enables experimental support for starting the
// kubelet with the default overlay network (flannel). Assumes flanneld
// is already running in client mode.
ExperimentalFlannelOverlay bool `json:"experimentalFlannelOverlay"`
// outOfDiskTransitionFrequency is duration for which the kubelet has to
// wait before transitioning out of out-of-disk node condition status.
OutOfDiskTransitionFrequency unversioned.Duration `json:"outOfDiskTransitionFrequency,omitempty"`
// nodeIP is IP address of the node. If set, kubelet will use this IP
// address for the node.
NodeIP string `json:"nodeIP,omitempty"`
// nodeLabels to add when registering the node in the cluster.
NodeLabels map[string]string `json:"nodeLabels"`
// nonMasqueradeCIDR configures masquerading: traffic to IPs outside this range will use IP masquerade.
NonMasqueradeCIDR string `json:"nonMasqueradeCIDR"`
// enable gathering custom metrics.
EnableCustomMetrics bool `json:"enableCustomMetrics"`
// Comma-delimited list of hard eviction expressions. For example, 'memory.available<300Mi'.
EvictionHard string `json:"evictionHard,omitempty"`
// Comma-delimited list of soft eviction expressions. For example, 'memory.available<300Mi'.
EvictionSoft string `json:"evictionSoft,omitempty"`
// Comma-delimeted list of grace periods for each soft eviction signal. For example, 'memory.available=30s'.
EvictionSoftGracePeriod string `json:"evictionSoftGracePeriod,omitempty"`
// Duration for which the kubelet has to wait before transitioning out of an eviction pressure condition.
EvictionPressureTransitionPeriod unversioned.Duration `json:"evictionPressureTransitionPeriod,omitempty"`
// Maximum allowed grace period (in seconds) to use when terminating pods in response to a soft eviction threshold being met.
EvictionMaxPodGracePeriod int32 `json:"evictionMaxPodGracePeriod,omitempty"`
// Maximum number of pods per core. Cannot exceed MaxPods
PodsPerCore int32 `json:"podsPerCore"`
// enableControllerAttachDetach enables the Attach/Detach controller to
// manage attachment/detachment of volumes scheduled to this node, and
// disables kubelet from executing any attach/detach operations
EnableControllerAttachDetach bool `json:"enableControllerAttachDetach"`
// A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs
// that describe resources reserved for non-kubernetes components.
// Currently only cpu and memory are supported. [default=none]
// See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.md for more detail.
SystemReserved utilconfig.ConfigurationMap `json:"systemReserved"`
// A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs
// that describe resources reserved for kubernetes system components.
// Currently only cpu and memory are supported. [default=none]
// See http://releases.k8s.io/HEAD/docs/user-guide/compute-resources.md for more detail.
KubeReserved utilconfig.ConfigurationMap `json:"kubeReserved"`
}

(2) 进入到Run函数中:

简单跳转,进入到run函数:
// Run runs the specified KubeletServer for the given KubeletConfig.  This should never exit.
// The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
// will be ignored.
func Run(s *options.KubeletServer, kcfg *KubeletConfig) error {
glog.Errorf(" begin to KubeletServer Run.")
err := run(s, kcfg)
if err != nil {
glog.Errorf("Failed running kubelet: %v", err)
}else {
glog.Errorf(" running kubelet succeed.")
}
return err
}

func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}

done := make(chan struct{})
if s.LockFilePath != "" {
glog.Infof("aquiring lock on %q", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to aquire file lock on %q: %v", s.LockFilePath, err)
}
if s.ExitOnLockContention {
glog.Infof("watching for inotify events for: %v", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
}
}
if c, err := configz.New("componentconfig"); err == nil {
c.Set(s.KubeletConfiguration)
} else {
glog.Errorf("unable to register configz: %s", err)
}
if kcfg == nil {
cfg, err := UnsecuredKubeletConfig(s)
if err != nil {
return err
}
kcfg = cfg

clientConfig, err := CreateAPIServerClientConfig(s)
if err == nil {
kcfg.KubeClient, err = clientset.NewForConfig(clientConfig)

// make a separate client for events
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
kcfg.EventClient, err = clientset.NewForConfig(&eventClientConfig)
}
if err != nil && len(s.APIServerList) > 0 {
glog.Warningf("No API client: %v", err)
}

if s.CloudProvider == kubeExternal.AutoDetectCloudProvider {
kcfg.AutoDetectCloudProvider = true
} else {
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
kcfg.Cloud = cloud
}
}

if kcfg.CAdvisorInterface == nil {
kcfg.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), kcfg.ContainerRuntime)
if err != nil {
return err
}
}

if kcfg.ContainerManager == nil {
if kcfg.SystemCgroups != "" && kcfg.CgroupRoot == "" {
return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")
}
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface, cm.NodeConfig{
RuntimeCgroupsName: kcfg.RuntimeCgroups,
SystemCgroupsName: kcfg.SystemCgroups,
KubeletCgroupsName: kcfg.KubeletCgroups,
ContainerRuntime: kcfg.ContainerRuntime,
CgroupsPerQOS: kcfg.CgroupsPerQOS,
CgroupRoot: kcfg.CgroupRoot,
})
if err != nil {
return err
}
}

runtime.ReallyCrash = s.ReallyCrashForTesting
rand.Seed(time.Now().UTC().UnixNano())

// TODO(vmarmol): Do this through container config.
oomAdjuster := kcfg.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
glog.Warning(err)
}

if err := RunKubelet(kcfg); err != nil {
return err
}

if s.HealthzPort > 0 {
healthz.DefaultHealthz()
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, wait.NeverStop)
}

if s.RunOnce {
return nil
}

<-done
return nil
}

run函数的主要步骤有:

step1: 创建一个done的channel,实现主Goroutine的运行

step2:KubeletConfiguration的转换

step3:  UnsecuredKubeletConfig调用,里面创建KubeletConfig(重点)

step4:CreateAPIServerClientConfig

step5: CloudProvider的初始化

step6:   cadvisor对象的初始化

step7:  NewContainerManager

step8: oomAdjuster对象的初始化

step9: RunKubelet (下一级的执行函数)

step10: healthz.DefaultHealthz (这里启动了一个协程)


(3) UnsecuredKubeletConfig分析

// UnsecuredKubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup
// is not valid. It will not start any background processes, and does not include authentication/authorization
func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
hostNetworkSources, err := kubetypes.GetValidatedSources(s.HostNetworkSources)
if err != nil {
return nil, err
}

hostPIDSources, err := kubetypes.GetValidatedSources(s.HostPIDSources)
if err != nil {
return nil, err
}

hostIPCSources, err := kubetypes.GetValidatedSources(s.HostIPCSources)
if err != nil {
return nil, err
}

mounter := mount.New()
var writer io.Writer = &io.StdWriter{}
if s.Containerized {
glog.V(2).Info("Running kubelet in containerized mode (experimental)")
mounter = mount.NewNsenterMounter()
writer = &io.NsenterWriter{}
}

tlsOptions, err := InitializeTLS(s)
if err != nil {
return nil, err
}

var dockerExecHandler dockertools.ExecHandler
switch s.DockerExecHandlerName {
case "native":
dockerExecHandler = &dockertools.NativeExecHandler{}
case "nsenter":
dockerExecHandler = &dockertools.NsenterExecHandler{}
default:
glog.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{}
}

imageGCPolicy := kubelet.ImageGCPolicy{
MinAge: s.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(s.ImageGCHighThresholdPercent),
LowThresholdPercent: int(s.ImageGCLowThresholdPercent),
}

diskSpacePolicy := kubelet.DiskSpacePolicy{
DockerFreeDiskMB: int(s.LowDiskSpaceThresholdMB),
RootFreeDiskMB: int(s.LowDiskSpaceThresholdMB),
}

manifestURLHeader := make(http.Header)
if s.ManifestURLHeader != "" {
pieces := strings.Split(s.ManifestURLHeader, ":")
if len(pieces) != 2 {
return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}

reservation, err := parseReservation(s.KubeReserved, s.SystemReserved)
if err != nil {
return nil, err
}

thresholds, err := eviction.ParseThresholdConfig(s.EvictionHard, s.EvictionSoft, s.EvictionSoftGracePeriod)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: s.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(s.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
}

return &KubeletConfig{
Address: net.ParseIP(s.Address),
AllowPrivileged: s.AllowPrivileged,
Auth: nil, // default does not enforce auth[nz]
CAdvisorInterface: nil, // launches background processes, not set here
VolumeStatsAggPeriod: s.VolumeStatsAggPeriod.Duration,
CgroupRoot: s.CgroupRoot,
Cloud: nil, // cloud provider might start background processes
ClusterDNS: net.ParseIP(s.ClusterDNS),
ClusterDomain: s.ClusterDomain,
ConfigFile: s.Config,
ConfigureCBR0: s.ConfigureCBR0,
ContainerManager: nil,
ContainerRuntime: s.ContainerRuntime,
RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration,
CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy,
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt.
RuntimeCgroups: s.RuntimeCgroups,
DockerExecHandler: dockerExecHandler,
EnableControllerAttachDetach: s.EnableControllerAttachDetach,
EnableCustomMetrics: s.EnableCustomMetrics,
EnableDebuggingHandlers: s.EnableDebuggingHandlers,
CgroupsPerQOS: s.CgroupsPerQOS,
EnableServer: s.EnableServer,
EventBurst: int(s.EventBurst),
EventRecordQPS: float32(s.EventRecordQPS),
FileCheckFrequency: s.FileCheckFrequency.Duration,
HostnameOverride: s.HostnameOverride,
HostNetworkSources: hostNetworkSources,
HostPIDSources: hostPIDSources,
HostIPCSources: hostIPCSources,
HTTPCheckFrequency: s.HTTPCheckFrequency.Duration,
ImageGCPolicy: imageGCPolicy,
KubeClient: nil,
ManifestURL: s.ManifestURL,
ManifestURLHeader: manifestURLHeader,
MasterServiceNamespace: s.MasterServiceNamespace,
MaxContainerCount: int(s.MaxContainerCount),
MaxOpenFiles: uint64(s.MaxOpenFiles),
MaxPerPodContainerCount: int(s.MaxPerPodContainerCount),
MaxPods: int(s.MaxPods),
NvidiaGPUs: int(s.NvidiaGPUs),
MinimumGCAge: s.MinimumGCAge.Duration,
Mounter: mounter,
NetworkPluginName: s.NetworkPluginName,
NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir),
NodeLabels: s.NodeLabels,
NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency.Duration,
NonMasqueradeCIDR: s.NonMasqueradeCIDR,
OOMAdjuster: oom.NewOOMAdjuster(),
OSInterface: kubecontainer.RealOS{},
PodCIDR: s.PodCIDR,
ReconcileCIDR: s.ReconcileCIDR,
PodInfraContainerImage: s.PodInfraContainerImage,
Port: uint(s.Port),
ReadOnlyPort: uint(s.ReadOnlyPort),
RegisterNode: s.RegisterNode,
RegisterSchedulable: s.RegisterSchedulable,
RegistryBurst: int(s.RegistryBurst),
RegistryPullQPS: float64(s.RegistryPullQPS),
ResolverConfig: s.ResolverConfig,
Reservation: *reservation,
KubeletCgroups: s.KubeletCgroups,
RktPath: s.RktPath,
RktAPIEndpoint: s.RktAPIEndpoint,
RktStage1Image: s.RktStage1Image,
RootDirectory: s.RootDirectory,
SeccompProfileRoot: s.SeccompProfileRoot,
Runonce: s.RunOnce,
SerializeImagePulls: s.SerializeImagePulls,
StandaloneMode: (len(s.APIServerList) == 0),
StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout.Duration,
SyncFrequency: s.SyncFrequency.Duration,
SystemCgroups: s.SystemCgroups,
TLSOptions: tlsOptions,
Writer: writer,
VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir),
OutOfDiskTransitionFrequency: s.OutOfDiskTransitionFrequency.Duration,
HairpinMode: s.HairpinMode,
BabysitDaemons: s.BabysitDaemons,
ExperimentalFlannelOverlay: s.ExperimentalFlannelOverlay,
NodeIP: net.ParseIP(s.NodeIP),
EvictionConfig: evictionConfig,
PodsPerCore: int(s.PodsPerCore),
}, nil
}

step1: 根据配置创建hostNetworkSources,hostPIDSources,hostIPCSources对象,该函数后面会用到

step2: mounter,writer,tlsOptions对象的创建

step3: dockerExecHandler的初始化

step4:imageGCPolicy初始化

step5: diskSpacePolicy初始化

step6:  manifestURLHeader,reservation,thresholds,evictionConfig对象的初始化

step7: 构建KubeletConfig对象,在这个构建函数中,同时构建了几个关键的对象;


例如:

dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration)
ProbeNetworkPlugins(s.NetworkPluginDir)
ProbeVolumePlugins(s.VolumePluginDir)
oom.NewOOMAdjuster()

(4) ConnectToDockerOrDie分析


// ConnectToDockerOrDie creates docker client connecting to docker daemon.
// If the endpoint passed in is "fake://", a fake docker client
// will be returned. The program exits if error occurs. The requestTimeout
// is the timeout for docker requests. If timeout is exceeded, the request
// will be cancelled and throw out an error. If requestTimeout is 0, a default
// value will be applied.
func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface {
if dockerEndpoint == "fake://" {
return NewFakeDockerClient()
}
client, err := getDockerClient(dockerEndpoint)
if err != nil {
glog.Fatalf("Couldn't connect to docker: %v", err)
}
glog.Infof("Start docker client with request timeout=%v", requestTimeout)
return newKubeDockerClient(client, requestTimeout)
}

// newKubeDockerClient creates an kubeDockerClient from an existing docker client. If requestTimeout is 0,
// defaultTimeout will be applied.
func newKubeDockerClient(dockerClient *dockerapi.Client, requestTimeout time.Duration) DockerInterface {
if requestTimeout == 0 {
requestTimeout = defaultTimeout
}
return &kubeDockerClient{
client: dockerClient,
timeout: requestTimeout,
}
}

// DockerInterface is an abstract interface for testability.  It abstracts the interface of docker client.
type DockerInterface interface {
ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error)
InspectContainer(id string) (*dockertypes.ContainerJSON, error)
CreateContainer(dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error)
StartContainer(id string) error
StopContainer(id string, timeout int) error
RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error
InspectImage(image string) (*dockertypes.ImageInspect, error)
ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error)
PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error
RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error)
ImageHistory(id string) ([]dockertypes.ImageHistory, error)
Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) error
Version() (*dockertypes.Version, error)
Info() (*dockertypes.Info, error)
CreateExec(string, dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error)
StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error
InspectExec(id string) (*dockertypes.ContainerExecInspect, error)
AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error
ResizeContainerTTY(id string, height, width int) error
ResizeExecTTY(id string, height, width int) error
}

该函数中初始化了一个kubeDockerClient对象,该对象包括一组DockerInterface。后续处理中,关于DockerInterface的操作,实际的执行体为该对象。


(5)  RunKubelet 函数分析

// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig) error {
kcfg.Hostname = nodeutil.GetHostname(kcfg.HostnameOverride)

if len(kcfg.NodeName) == 0 {
// Query the cloud provider for our node name, default to Hostname
nodeName := kcfg.Hostname
if kcfg.Cloud != nil {
var err error
instances, ok := kcfg.Cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}

nodeName, err = instances.CurrentNodeName(kcfg.Hostname)
if err != nil {
return fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
}

glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
}

kcfg.NodeName = nodeName
}

eventBroadcaster := record.NewBroadcaster()
kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName})
eventBroadcaster.StartLogging(glog.V(3).Infof)
if kcfg.EventClient != nil {
glog.V(4).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kcfg.EventClient.Events("")})
} else {
glog.Warning("No api server defined - no events will be sent to API server.")
}

privilegedSources := capabilities.PrivilegedSources{
HostNetworkSources: kcfg.HostNetworkSources,
HostPIDSources: kcfg.HostPIDSources,
HostIPCSources: kcfg.HostIPCSources,
}
capabilities.Setup(kcfg.AllowPrivileged, privilegedSources, 0)

credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)
glog.V(2).Infof("Using root directory: %v", kcfg.RootDirectory)

builder := kcfg.Builder
if builder == nil {
builder = CreateAndInitKubelet
}
if kcfg.OSInterface == nil {
kcfg.OSInterface = kubecontainer.RealOS{}
}
k, podCfg, err := builder(kcfg)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}

util.ApplyRLimitForSelf(kcfg.MaxOpenFiles)

// TODO(dawnchen): remove this once we deprecated old debian containervm images.
// This is a workaround for issue: https://github.com/opencontainers/runc/issues/726
// The current chosen number is consistent with most of other os dist.
const maxkeysPath = "/proc/sys/kernel/keys/root_maxkeys"
const minKeys uint64 = 1000000
key, err := ioutil.ReadFile(maxkeysPath)
if err != nil {
glog.Errorf("Cannot read keys quota in %s", maxkeysPath)
} else {
fields := strings.Fields(string(key))
nkey, _ := strconv.ParseUint(fields[0], 10, 64)
if nkey < minKeys {
glog.Infof("Setting keys quota in %s to %d", maxkeysPath, minKeys)
err = ioutil.WriteFile(maxkeysPath, []byte(fmt.Sprintf("%d", uint64(minKeys))), 0644)
if err != nil {
glog.Warningf("Failed to update %s: %v", maxkeysPath, err)
}
}
}
const maxbytesPath = "/proc/sys/kernel/keys/root_maxbytes"
const minBytes uint64 = 25000000
bytes, err := ioutil.ReadFile(maxbytesPath)
if err != nil {
glog.Errorf("Cannot read keys bytes in %s", maxbytesPath)
} else {
fields := strings.Fields(string(bytes))
nbyte, _ := strconv.ParseUint(fields[0], 10, 64)
if nbyte < minBytes {
glog.Infof("Setting keys bytes in %s to %d", maxbytesPath, minBytes)
err = ioutil.WriteFile(maxbytesPath, []byte(fmt.Sprintf("%d", uint64(minBytes))), 0644)
if err != nil {
glog.Warningf("Failed to update %s: %v", maxbytesPath, err)
}
}
}

// process pods and exit.
if kcfg.Runonce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
glog.Infof("Started kubelet %s as runonce", version.Get().String())
} else {
startKubelet(k, podCfg, kcfg)
glog.Errorf("Started kubelet %s", version.Get().String())
}
return nil
}

step1: KubeletConfig中NodeName的设置

step2: NewBroadcaster与API Server交互(待验证)

step3: privilegedSources capabilities初始化

step4: SetPreferredDockercfgPath设置

step5: CreateAndInitKubelet,该函数的作用从名字中就可以看出,创建并初始化kubelet(重要)

step6: ApplyRLimitForSelf

step7: 中间的代码,好像是是修改了系统的某些参数,解决一个debug

step8: startKubelet进入下一级的初始化过程


(6)  CreateAndInitKubelet函数分析

CreateAndInitKubelet主要做了两件事情,创建了kubecontainer.ContainerGCPolicy,然后进行到NewMainKubelet中进行kubelet大量的对象创建工作

func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
// TODO: KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods
// used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
// a nil pointer to it when what we really want is a nil interface.
var kubeClient clientset.Interface
if kc.KubeClient != nil {
kubeClient = kc.KubeClient
// TODO: remove this when we've refactored kubelet to only use clientset.
}

gcPolicy := kubecontainer.ContainerGCPolicy{
MinAge: kc.MinimumGCAge,
MaxPerPodContainer: kc.MaxPerPodContainerCount,
MaxContainers: kc.MaxContainerCount,
}

daemonEndpoints := &api.NodeDaemonEndpoints{
KubeletEndpoint: api.DaemonEndpoint{Port: int32(kc.Port)},
}

pc = kc.PodConfig
if pc == nil {
pc = makePodSourceConfig(kc)
}
k, err = kubelet.NewMainKubelet(
kc.Hostname,
kc.NodeName,
kc.DockerClient,
kubeClient,
kc.RootDirectory,
kc.SeccompProfileRoot,
kc.PodInfraContainerImage,
kc.SyncFrequency,
float32(kc.RegistryPullQPS),
kc.RegistryBurst,
kc.EventRecordQPS,
kc.EventBurst,
gcPolicy,
pc.SeenAllSources,
kc.RegisterNode,
kc.RegisterSchedulable,
kc.StandaloneMode,
kc.ClusterDomain,
kc.ClusterDNS,
kc.MasterServiceNamespace,
kc.VolumePlugins,
kc.NetworkPlugins,
kc.NetworkPluginName,
kc.StreamingConnectionIdleTimeout,
kc.Recorder,
kc.CAdvisorInterface,
kc.ImageGCPolicy,
kc.DiskSpacePolicy,
kc.Cloud,
kc.AutoDetectCloudProvider,
kc.NodeLabels,
kc.NodeStatusUpdateFrequency,
kc.OSInterface,
kc.CgroupsPerQOS,
kc.CgroupRoot,
kc.ContainerRuntime,
kc.RuntimeRequestTimeout,
kc.RktPath,
kc.RktAPIEndpoint,
kc.RktStage1Image,
kc.Mounter,
kc.Writer,
kc.ConfigureCBR0,
kc.NonMasqueradeCIDR,
kc.PodCIDR,
kc.ReconcileCIDR,
kc.MaxPods,
kc.PodsPerCore,
kc.NvidiaGPUs,
kc.DockerExecHandler,
kc.ResolverConfig,
kc.CPUCFSQuota,
daemonEndpoints,
kc.OOMAdjuster,
kc.SerializeImagePulls,
kc.ContainerManager,
kc.OutOfDiskTransitionFrequency,
kc.ExperimentalFlannelOverlay,
kc.NodeIP,
kc.Reservation,
kc.EnableCustomMetrics,
kc.VolumeStatsAggPeriod,
kc.ContainerRuntimeOptions,
kc.HairpinMode,
kc.BabysitDaemons,
kc.EvictionConfig,
kc.Options,
kc.EnableControllerAttachDetach,
)

if err != nil {
return nil, nil, err
}

k.BirthCry()

k.StartGarbageCollection()

return k, pc, nil
}

step1: kubecontainer.ContainerGCPolicy对象创建

step2: api.NodeDaemonEndpoints赋值

step3:NewMainKubelet函数调用,并返回一个Kubelet对象  (重要,该函数类创建了大多数kubelet需要用到的对象)

step4:BirthCry,发送一个事件,通知kubelet已经启动

C:\Go\src\k8s.io\kubernetes\pkg\kubelet\kubelet.go:3556

step5:启动GarbageCollect(垃圾桶收集)的协程

step6:   Kubelet对象返回C:

未完\Go\src\k8s.io\kubernetes\pk\kubelet\kubelet.go

未完待续     


参考大牛的博文:k8s源码分析-----kubelet(1)主要流程

http://blog.csdn.net/screscent/article/details/51086684