【kubernetes/k8s源码分析】kubelet源码分析-启动运行与信息处理

时间:2021-12-28 16:48:35

简介

       在Kubernetes集群中,每个Node节点都会启动一个kubelet进程。用于处理Master节点下发到的任务,管理Pod以及Pod中的容器。每个Kubelet进程会向APIServer注册节点信息,定期向Master节点汇报资源的使用情况,并通过cAdvise监控容器和节点资源。

关键结构       

     type KubeletConfiguration struct 略内容太多

type Pod struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// Specification of the desired behavior of the pod.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status
// +optional
Spec PodSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`

// Most recently observed status of the pod.
// This data may not be up to date.
// Populated by the system.
// Read-only.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status
// +optional
Status PodStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}
      是运行在一个host上容器的集合,通过客户端创建并调度到hosts

一. Kubelet启动流程

main 入口

cmd/kubelet/kubelet.go 源码如下:

func main() {
s := options.NewKubeletServer()
s.AddFlags(pflag.CommandLine)

flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()

verflag.PrintAndExitIfRequested()

if s.ExperimentalDockershim {
if err := app.RunDockershim(&s.KubeletConfiguration, &s.ContainerRuntimeOptions); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}

if err := app.Run(s, nil); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}
  • options.NewKubeletServer(): 创建了一个KubeletServer结构
  • 日志等的初始化
  • verflag.PrintAndExitIfRequested(): 判断了参数是否是help,是的话直接打印help退出。
  • 最后就关键函数app.Run(s, nil)

Run接口

func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {
// TODO: this should be replaced by a --standalone flag
standaloneMode := (len(s.APIServerList) == 0 && !s.RequireKubeConfig)

if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}

done := make(chan struct{})
if s.LockFilePath != "" {
glog.Infof("acquiring file lock on %q", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
}
if s.ExitOnLockContention {
glog.Infof("watching for inotify events for: %v", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
}
}

// Set feature gates based on the value in KubeletConfiguration
err = utilfeature.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
     整个过程都是一个前期的准备工作,比如一些参数的准备。

UnsecuredKubeletDeps()接口

func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) {
// Initialize the TLS Options
tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
if err != nil {
return nil, err
}

mounter := mount.New(s.ExperimentalMounterPath)
var writer kubeio.Writer = &kubeio.StdWriter{}
if s.Containerized {
glog.
V(2).Info("Running kubelet in containerized mode (experimental)")
mounter = mount.
NewNsenterMounter()
writer = &kubeio.NsenterWriter{}
}

var dockerClient libdocker.Interface
if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
dockerClient = libdocker.
ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration,
s.ImagePullProgressDeadline.Duration)
}
else {
dockerClient =
nil
}

return &kubelet.KubeletDeps{
Auth:
nil, // default does not enforce auth[nz]
CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
Cloud: nil, // cloud provider might start background processes
ContainerManager: nil,
DockerClient: dockerClient,
KubeClient: nil,
ExternalKubeClient: nil,
Mounter: mounter,
NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir),
OOMAdjuster: oom.NewOOMAdjuster(),
OSInterface: kubecontainer.RealOS{},
Writer: writer,
VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir),
TLSOptions: tlsOptions,
}, nil
}

  • 初始化TLS证书啥的
  • kubelet可能会以容器的方式部署,需要配置标准输出,若部署到容器中,就会有namespace隔离的问题,导致kubelet无法访问docker容器的namespace,所以这里会进行判断,如果运行在容器中的话,就需要用到nsenter,它可以协助kubelet到指定的namespace运行命令。
  • 返回KuneletDeps结构体


二. Kubelet运行


     在 RunKubelet 内部,我们看到它最终会调用 startKublet 函数
func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {
。。。。。。。。。。。。。。。。。。。。。。。。

// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
glog.Infof("Started kubelet %s as runonce", version.Get().String())
} else {
startKubelet(k, podCfg, kubeCfg, kubeDeps)
glog.Infof("Started kubelet %s", version.Get().String())
}
return nil
}
     运行 kubelet 主要启动两个功能,k.Run() 来进入主循环,执行入口一个管道,会实时地发送过来 pod 最新的配置信息是 k.Run(podCfg.Updates())k.ListenAndServe() 启动 kubelet 的 API 服务。
     Run的代码如下:
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {

// init modulers,如imageManager、containerManager、oomWathcer、resourceAnalyzer
if err := kl.initializeModules(); err != nil {
kl.recorder.
Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
glog.
Error(err)
kl.runtimeState.
setInitError(err)
}

// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

// 起协程,定时向APIServer更新node status
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
}
// 起协程,定时同步网络状态
go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

// Start loop to sync iptables util rules
if kl.makeIPTablesUtilChains {
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
}

// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
// 起协程,定时处理那些被killing pods
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

// Start gorouting responsible for checking limits in resolv.conf
if kl.resolverConfig != "" {
go wait.Until(func() { kl.checkLimitsForResolvConf() }, 30*time.Second, wait.NeverStop)
}

// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.
Start()

// Start the pod lifecycle event generator.
kl.pleg.Start()
// 开启pods事件,用于处理APIServer下发的任务,updates是一个管道
kl.syncLoop(updates, kl)
}
    基本上就是 kubelet 各种组件的启动,每个组件都是以 goroutine 运行的。最后的syncLoop处理所有 pod 更新的主循环,获取 pod 的变化(新建、修改和删除),调用对应的处理函数保证节点上的容器符合 pod 的配置。
    syncLoop代码如下:
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
// The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
for {
if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
time.Sleep(5 * time.Second)
continue
}

kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
       从三个channel观察pod变化,file api-server http。当有变化时调用对应的处理函数,保证 pod 处于期望的状态。如果没有变化,会定期保证所有的容器和最新的期望状态保持一致。
    循环调用 syncLoopIteration 方法。如果在每次出现错误,kubelet 会记录到 runtimeState 中,就等待 5 秒中继续循环。

三、Pod信息处理

    syncLoopIteration代码如下所示:
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}

switch u.Op {
case kubetypes.ADD:
glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")
}

// Mark the source ready after receiving at least one update from the
// source. Once all the sources are marked ready, various cleanup
// routines will start reclaiming resources. It is important that this
// takes place only after kubelet calls the update handler to process
// the update to ensure the internal pod cache is up-to-date.
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
}
}

if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
kl.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.

// We should not use the pod from livenessManager, because it is never updated after
// initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
glog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
glog.Errorf("Failed cleaning pods: %v", err)
}
}
}
return true
}
     
configCh: 对于配置更改分发pods适合的回调接口处理事件
  • 事件类型有kubetypes.ADD
  • 事件类型有kubetypes.UPDATE
  • 事件类型有kubetypes.REMOVE
  • 事件类型有kubetypes.RECONCILE
  • 事件类型有kubetypes.DELETE
  • 事件类型有kubetypes.SET

分析用户添加新 pod 的情况,也就是 handler.HandlePodAdditions(u.Pods) 
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)

if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}

if !kl.podIsTerminated(pod) {
// Only go through the admission process if the pod is not
// terminated.

// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutTerminatedPods(existingPods)

// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
  • 把所有的 pod 按照创建日期排序,最先创建的 pod 会先被处理
  • 验证 pod 是否能在该节点运行,如果不可以拒绝
  • 把 pod 分配给给 worker 做异步处理


dispatchWork代码如下所示:
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
if kl.podIsTerminated(pod) {
if pod.DeletionTimestamp != nil {
// If the pod is in a terminated state, there is no pod worker to
// handle the work item. Check if the DeletionTimestamp has been
// set, and force a status update to trigger a pod deletion request
// to the apiserver.
kl.statusManager.TerminatePod(pod)
}
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}

     dispatchWork 主要工作就是把接收到的参数封装传给kl.podWorkers.UpdatePod 方法
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool

p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
// We need to have a buffer here, because checkForUpdates() method that
// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
// the channel is empty, so buffer of size 1 is enough.
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates

// Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
// if a request to kill a pod is pending, we do not let anything overwrite that request.
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
     检查podUpdates这个map,由于是新创建的pod,会创建一个go routine,执行函数mannagePodLoop。


   
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
return err
}
err =
p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod
,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.
Now()
return err
}()
// notify the call-back function if the operation succeeded or not
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
if err != nil {
glog.
Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
// if we failed sync, we throw more specific events for why it happened.
// as a result, i question the value of this event.
// TODO: determine if we can remove this in a future release.
// do not include descriptive text that can vary on why it failed so in a pathological
// scenario, kubelet does not create enough discrete events that miss default aggregation
// window.
p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "Error syncing pod")
}
p.
wrapUp(update.Pod.UID, err)
}
}
      manangePodLoop调用syncPodFn函数去同步Pod,syncPodFn这个函数实际上是syncPod函数

    syncPod函数内容比较多,挑比较容易理解的讲解一下,哈哈: makePodDataDirs,创建pod数据目录,三个目录,pod,volume,plugin。即目录/var/lib/kubelet/uuid
func (kl *Kubelet) makePodDataDirs(pod *v1.Pod) error {
uid := pod.UID
if err := os.MkdirAll(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.MkdirAll(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.MkdirAll(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
return nil
}

     WaitForAttachAndMount等待将该Pod依赖的Volume都挂载完毕
func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
expectedVolumes := getExpectedVolumes(pod)
if len(expectedVolumes) == 0 {
// No volumes to verify
return nil
}

glog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
uniquePodName := volumehelper.GetUniquePodName(pod)

// Some pods expect to have Setup called over and over again to update.
// Remount plugins for which this is true. (Atomically updating volumes,
// like Downward API, depend on this to update the contents of the volume).
vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
vm.actualStateOfWorld.MarkRemountRequired(uniquePodName)

err := wait.Poll(
podAttachAndMountRetryInterval,
podAttachAndMountTimeout,
vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))

if err != nil {
// Timeout expired
unmountedVolumes :=
vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
if len(unmountedVolumes) == 0 {
return nil
}

return fmt.Errorf(
"timeout expired waiting for volumes to attach/mount for pod %q/%q. list of unattached/unmounted volumes=%v",
pod.Namespace,
pod.Name,
unmountedVolumes)
}

glog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
return nil
}如果有 image secrets,去 apiserver 获取对应的 secrets 数据


    如果有 image secrets,去 apiserver 获取对应的 secrets 数据
func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {
pullSecrets := []v1.Secret{}

for _, secretRef := range pod.Spec.ImagePullSecrets {
secret, err := kl.secretManager.GetSecret(pod.Namespace, secretRef.Name)
if err != nil {
glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err)
continue
}

pullSecrets = append(pullSecrets, *secret)
}

return pullSecrets
}
 
     最终调用的是containerRuntime.SyncPod函数创建container,函数位置pkg/kublet/kuberuntime/kuberuntime_manager.go
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodContainerChanges(pod, podStatus)
glog.V(3).Infof("computePodContainerChanges got %+v for pod %q", podContainerChanges, format.Pod(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(api.Scheme, pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
}
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, "SandboxChanged", "Pod sandbox changed, it will be killed and re-created.")
} else {
glog.V(4).Infof("SyncPod received new pod %q, will create a new sandbox for it", format.Pod(pod))
}
}

// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.CreateSandbox || (len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0) {
if len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0 {
glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
} else {
glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
}

killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
return
}
} else {
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
result.AddSyncResult(killContainerResult)
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
return
}
}
}

// Keep terminated init containers fairly aggressively controlled
m.pruneInitContainersBeforeStart(pod, podStatus, podContainerChanges.InitContainersToKeep)

// We pass the value of the podIP down to generatePodSandboxConfig and
// generateContainerConfig, which in turn passes it to various other
// functions, in order to facilitate functionality that requires this
// value (hosts file and downward API) and avoid races determining
// the pod IP in cases where a container requires restart but the
// podIP isn't in the status manager yet.
//
// We default to the IP in the passed-in pod status, and overwrite it if the
// sandbox needs to be (re)started.
podIP := ""
if podStatus != nil {
podIP = podStatus.IP
}

// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox && len(podContainerChanges.ContainersToStart) > 0 {
var msg string
var err error

glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
result.AddSyncResult(createSandboxResult)
podSandboxID, msg, err =
m.createPodSandbox(pod, podContainerChanges.Attempt)
if err != nil {
createSandboxResult.
Fail(kubecontainer.ErrCreatePodSandbox, msg)
glog.
Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
return
}
glog.
V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

podSandboxStatus
, err := m.runtimeService.PodSandboxStatus(podSandboxID)
if err != nil {
glog.
Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
result.
Fail(err)
return
}

// If we ever allow updating a pod from non-host-network to
// host-network, we may use a stale IP.
if !kubecontainer.IsHostNetworkPod(pod) {
// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
glog.
V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
}
}

// Get podSandboxConfig for containers to start.
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
result.
AddSyncResult(configPodSandboxResult)
podSandboxConfig
, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
if err != nil {
message := fmt.
Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
glog.
Error(message)
configPodSandboxResult.
Fail(kubecontainer.ErrConfigPodSandbox, message)
return
}

// Step 5: start init containers.
status, next, done := findNextInitContainerToRun(pod, podStatus)
if status != nil && status.ExitCode != 0 {
// container initialization has failed, flag the pod as failed
initContainerResult := kubecontainer.NewSyncResult(kubecontainer.InitContainer, status.Name)
initContainerResult.
Fail(kubecontainer.ErrRunInitContainer, fmt.Sprintf("init container %q exited with %d", status.Name, status.ExitCode))
result.
AddSyncResult(initContainerResult)
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
utilruntime.
HandleError(fmt.Errorf("error running pod %q init container %q, restart=Never: %#v", format.Pod(pod), status.Name, status))
return
}
utilruntime.
HandleError(fmt.Errorf("Error running pod %q init container %q, restarting: %#v", format.Pod(pod), status.Name, status))
}
if next != nil {
if len(podContainerChanges.ContainersToStart) == 0 {
glog.
V(4).Infof("No containers to start, stopping at init container %+v in pod %v", next.Name, format.Pod(pod))
return
}

// If we need to start the next container, do so now then exit
container := next
startContainerResult := kubecontainer.
NewSyncResult(kubecontainer.StartContainer, container.Name)
result.
AddSyncResult(startContainerResult)

isInBackOff
, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.
Fail(err, msg)
glog.
V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))
return
}

glog.
V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
startContainerResult.
Fail(err, msg)
utilruntime.
HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))
return
}

// Successfully started the container; clear the entry in the failure
glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
return
}
if !done {
// init container still running
glog.V(4).Infof("An init container is still running in pod %v", format.Pod(pod))
return
}
if podContainerChanges.InitFailed {
glog.
V(4).Infof("Not all init containers have succeeded for pod %v", format.Pod(pod))
return
}

// Step 6: start containers in podContainerChanges.ContainersToStart.
for idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
startContainerResult := kubecontainer.
NewSyncResult(kubecontainer.StartContainer, container.Name)
result.
AddSyncResult(startContainerResult)

isInBackOff
, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.
Fail(err, msg)
glog.
V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
continue
}

glog.
V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
startContainerResult.
Fail(err, msg)
utilruntime.
HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
continue
}
}

return
}
  1. Compute sandbox and container changes.
  2. Kill pod sandbox if necessary.
  3. Kill any containers that should not be running.
  4. Create sandbox if necessary.
  5. Create init containers.
  6. Create normal containers.

主要解析函数4. Create sandbox if necessary 主要函数 createPodSandbox 位于路径pkg/kublet/kuberuntime/kuberuntime_sandbox.go最主要调用RunPodSandbox函数,位于路径pkg/kublet/dockershim/docker_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
if err != nil {
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
glog.Error(message)
return "", message, err
}

// Create pod logs directory
err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
if err != nil {
message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
glog.Errorf(message)
return "", message, err
}

podSandBoxID, err :=
m.runtimeService.RunPodSandbox(podSandboxConfig)
if err != nil {
message := fmt.
Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
glog.
Error(message)
return "", message, err
}

return podSandBoxID, "", nil
}


func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id string, err error) {
// Step 1: Pull the image for the sandbox.
image := defaultSandboxImage
podSandboxImage := ds.podSandboxImage
if len(podSandboxImage) != 0 {
image = podSandboxImage
}

// NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly.
// see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository
// Only pull sandbox image when it's not present - v1.PullIfNotPresent.
if err := ensureSandboxImageExists(ds.client, image); err != nil {
return "", err
}

// Step 2: Create the sandbox container.
createConfig, err := ds.makeSandboxDockerConfig(config, image)
if err != nil {
return "", fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)
}
createResp
, err := ds.client.CreateContainer(*createConfig)
if err != nil {
createResp
, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)
}

if err != nil || createResp == nil {
return "", fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err)
}

ds.
setNetworkReady(createResp.ID, false)
defer func(e *error) {
// Set networking ready depending on the error return of
// the parent function
if *e == nil {
ds.
setNetworkReady(createResp.ID, true)
}
}(&err)

// Step 3: Create Sandbox Checkpoint.
if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
return createResp.ID, err
}

// Step 4: Start the sandbox container.
// Assume kubelet's garbage collector would remove the sandbox later, if
// startContainer failed.
err = ds.client.StartContainer(createResp.ID)
if err != nil {
return createResp.ID, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
}

// Rewrite resolv.conf file generated by docker.
// NOTE: cluster dns settings aren't passed anymore to docker api in all cases,
// not only for pods with host network: the resolver conf will be overwritten
// after sandbox creation to override docker's behaviour. This resolv.conf
// file is shared by all containers of the same pod, and needs to be modified
// only once per pod.
if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
containerInfo
, err := ds.client.InspectContainer(createResp.ID)
if err != nil {
return createResp.ID, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err)
}

if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil {
return createResp.ID, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err)
}
}

// Do not invoke network plugins if in hostNetwork mode.
if nsOptions := config.GetLinux().GetSecurityContext().GetNamespaceOptions(); nsOptions != nil && nsOptions.HostNetwork {
return createResp.ID, nil
}

// Step 5: Setup networking for the sandbox.
// All pod networking is setup by a CNI plugin discovered at startup time.
// This plugin assigns the pod ip, sets up routes inside the sandbox,
// creates interfaces etc. In theory, its jurisdiction ends with pod
// sandbox networking, but it might insert iptables rules or open ports
// on the host as well, to satisfy parts of the pod spec that aren't
// recognized by the CNI standard yet.
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
err = ds.network.
SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations)
if err != nil {
// TODO(random-liu): Do we need to teardown network here?
if err := ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod); err != nil {
glog.
Warningf("Failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err)
}
}
return createResp.ID, err
}

  • 默认sandbox image为 gcr.io/google_containers/pause-amd64:3.0
  • ensureSandboxImageExists 当镜像不存在是进行拉取工作
  • CreateContainer最终通过docker API POST方法调用 /containers/create
  • CreateCheckpoint写入文件,文件名为容器ID
  • StartContainer最终通过docker API POST方法调用 /containers/containerID/start
  • 重新写入resolv.conf由docker产生,pod里的容器共享
  • InspectContainer最终通过docker API GET方法调用 /containers/containerID/json
  • 为容器建立网络,通过CNI建立网络,建立loopback接口,建立网络设置为混杂模式(调用命令ip link show dev / ip set bridgeName promisc on)。流量流出需要SNAT(利用iptables规则链)
const NET_CONFIG_TEMPLATE = `{
"cniVersion": "0.1.0",
"name": "kubenet",
"type": "bridge",
"bridge": "%s",
"mtu": %d,
"addIf": "%s",
"isGateway": true,
"ipMasq": false,
"hairpinMode": %t,
"ipam": {
"type": "host-local",
"subnet": "%s",
"gateway": "%s",
"routes": [
{ "dst": "0.0.0.0/0" }
]
}
}`