Docker基础之container start命令

时间:2025-05-15 08:23:21

一、postContainersStart

daemon的路由定义如下:

router.NewPostRoute("/containers/{name:.*}/start", r.postContainersStart),

查看handler函数postContainersStart得知,postContainersStart的执行步骤如下:
1、 解析request
2、调用daemon启动ContainerStart()、(vars[“name”], hostConfig, checkpoint, checkpointDir)

func (s *containerRouter) postContainersStart(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
	version := httputils.VersionFromContext(ctx)
	var hostConfig *container.HostConfig
	//一个非nil json对象至少有7个字符。
	if r.ContentLength > 7 || r.ContentLength == -1 {
		if versions.GreaterThanOrEqualTo(version, "1.24") {
			return validationError{fmt.Errorf("starting container with non-empty request body was deprecated since v1.10 and removed in v1.12")}
		}
		if err := httputils.CheckForJSON(r); err != nil {
			return err
		}
		c, err := s.decoder.DecodeHostConfig(r.Body)
		if err != nil {
			return err
		}
		hostConfig = c
	}
	if err := httputils.ParseForm(r); err != nil {
		return err
	}

	checkpoint := r.Form.Get("checkpoint")
	checkpointDir := r.Form.Get("checkpoint-dir")
	//参数解析完毕之后
	if err := s.backend.ContainerStart(vars["name"], hostConfig, checkpoint, checkpointDir); err != nil {
		return err
	}
	w.WriteHeader(http.StatusNoContent)
	return nil
}

二、ContainerStart

1、根据名称(ID)获取容器对象
2、验证配置
3、开始启动container,(container, checkpoint, checkpointDir, true)

//ContainerStart启动容器。
func (daemon *Daemon) ContainerStart(name string, hostConfig *containertypes.HostConfig, checkpoint string, checkpointDir string) error {
	if checkpoint != "" && !daemon.HasExperimental() {
		//checkpoint 仅仅支持在experimental mode下使用
		return apierrors.NewBadRequestError(fmt.Errorf("checkpoint is only supported in experimental mode"))
	}
	//根据名称(ID)获取容器对象
	container, err := daemon.GetContainer(name)
	if err != nil {
		return err
	}
	if container.IsPaused() {
		//paused状态
		return fmt.Errorf("Cannot start a paused container, try unpause instead.")
	}
	if container.IsRunning() {
		//running状态
		err := fmt.Errorf("Container already started")
		return apierrors.NewErrorWithStatusCode(err, http.StatusNotModified)
	}
	// Windows在这里没有向后兼容性问题。非Windows系统中,为了保持兼容性,hostconfig应该在create的时候被传给该container,而不是在start期间
	if runtime.GOOS != "windows" {
		// 这是为了向后兼容——hostconfig应该在创建容器时传递,而不是在开始时。
		if hostConfig != nil {
			logrus.Warn("DEPRECATED: Setting host configuration options when the container starts is deprecated and has been removed in Docker 1.12")
			oldNetworkMode := container.HostConfig.NetworkMode
			if err := daemon.setSecurityOptions(container, hostConfig); err != nil {
				return err
			}
			if err := daemon.mergeAndVerifyLogConfig(&hostConfig.LogConfig); err != nil {
				return err
			}
			if err := daemon.setHostConfig(container, hostConfig); err != nil {
				return err
			}
			newNetworkMode := container.HostConfig.NetworkMode
			if string(oldNetworkMode) != string(newNetworkMode) {
				// 如果用户在启动时改变了网络模式,请清理旧网络。这是一个已弃用的特性,已在Docker 1.12中删除
				container.NetworkSettings.Networks = nil
				if err := container.ToDisk(); err != nil {
					return err
				}
			}
			container.InitDNSHostConfig()
		}
	} else {
		if hostConfig != nil {
			return fmt.Errorf("Supplying a hostconfig on start is not supported. It should be supplied on create")
		}
	}
	// 检查hostConfig是否与当前系统设置一致。它可能发生cgroups被umounted或类似的情况。
	if _, err = daemon.verifyContainerSettings(container.HostConfig, nil, false); err != nil {
		return err
	}
	// 适应旧的容器,以防我们在这个函数中有更新,旧的容器永远没有机会在创建阶段调用新函数。
	if hostConfig != nil {
		if err := daemon.adaptContainerSettings(container.HostConfig, false); err != nil {
			return err
		}
	}
	//开始启动container
	return daemon.containerStart(container, checkpoint, checkpointDir, true)
}

1、关键函数containerStart
()函数是docker start中的核心部分,其工作流程如下:
1、设置容器的文件系统,/var/lib/docker/overlay/{}/merged
2、 设置容器的网络模式,调用了libnetwork,即CNM模型
3、 创建/proc /dev等spec文件,对容器所特有的属性都进行设置,例如:资源限制,命名空间,安全模式等等配置信息
4、 调用containerd进行Create容器

// containerStart通过设置容器所需的一切,比如存储和网络,以及容器之间的链接,来准备容器运行。容器将等待开始运行的信号。
func (daemon *Daemon) containerStart(container *container.Container, checkpoint string, checkpointDir string, resetRestartManager bool) (err error) {
	start := time.Now()
	//给准备启动的container上锁
	container.Lock()
	defer container.Unlock()
	if resetRestartManager && container.Running { // 如果已经在重启步骤中,则跳过此检查,并且resetRestartManager==false
		return nil
	}
	if container.RemovalInProgress || container.Dead {
		return fmt.Errorf("Container is marked for removal and cannot be started.")
	}
	// 启动失败之后的clean up操作
	defer func() {
		if err != nil {
			container.SetError(err)
			// 如果没有其他人设置它,确保我们不会让它为零
			if container.ExitCode() == 0 {
				container.SetExitCode(128)
			}
			container.ToDisk()
			container.Reset(false)
			daemon.Cleanup(container)
			// 如果容器设置了AutoRemove标志,在清理后删除它
			if container.HostConfig.AutoRemove {
				container.Unlock()
				if err := daemon.ContainerRm(container.ID, &types.ContainerRmConfig{ForceRemove: true, RemoveVolume: true}); err != nil {
					logrus.Errorf("can't remove container %s: %v", container.ID, err)
				}
				container.Lock()
			}
		}
	}()
	//设置container的BaseFS为该container的RWLayer的挂载点path
	if err := daemon.conditionalMountOnStart(container); err != nil {
		return err
	}
	// 确保NetworkMode有一个可接受的值。我们这样做是为了确保API向后兼容性。
	container.HostConfig = runconfig.SetDefaultNetModeIfBlank(container.HostConfig)

	//初始化容器的网络,默认模式bridge:同一个host主机上容器的通信通过Linux bridge进行与宿主机外部网络的通信需要通过宿主机端口进行NAT
	if err := daemon.initializeNetworking(container); err != nil {
		return err
	}

	//对容器所特有的属性都进行设置,例如:资源限制,命名空间,安全模式等等配置信息
	spec, err := daemon.createSpec(container)
	if err != nil {
		return err
	}
	createOptions, err := daemon.getLibcontainerdCreateOptions(container)
	if err != nil {
		return err
	}
	if resetRestartManager {
		container.ResetRestartManager(true)
	}

	if checkpointDir == "" {
		checkpointDir = container.CheckpointDir()
	}
	//前面所有的参数等设置都是为了这里调用containerd服务
	if err := daemon.containerd.Create(container.ID, checkpoint, checkpointDir, *spec, container.InitializeStdio, createOptions...); err != nil {
		errDesc := grpc.ErrorDesc(err)
		contains := func(s1, s2 string) bool {
			return strings.Contains(strings.ToLower(s1), s2)
		}
		logrus.Errorf("Create container failed with error: %s", errDesc)
		// 如果我们从容器的初始开始接收到一个内部错误,那么让我们返回它,而不是输入重新启动循环设置为127,因为容器CMD不存在
		if contains(errDesc, container.Path) &&
			(contains(errDesc, "executable file not found") ||
				contains(errDesc, "no such file or directory") ||
				contains(errDesc, "system cannot find the file specified")) {
			container.SetExitCode(127)
		}
		if contains(errDesc, syscall.EACCES.Error()) {
			container.SetExitCode(126)
		}
		// 试图将一个文件挂载到一个目录上,或将一个目录挂载到一个文件上,可能是从用户指定的绑定挂载
		if contains(errDesc, syscall.ENOTDIR.Error()) {
			errDesc += ": Are you trying to mount a directory onto a file (or vice-versa)? Check if the specified host path exists and is the expected type"
			container.SetExitCode(127)
		}
		return fmt.Errorf("%s", errDesc)
	}
	containerActions.WithValues("start").UpdateSince(start)
	return nil
}

三、设置文件系统

1、设置container的BaseFS为该container的RWLayer的挂载点path:/var/lib/docker/overlay/{}/merged
2、BaseFS 是该container整个文件系统的路径。
3、 docker exec -it {} bin/bash进去看到的视图,就是/var/lib/docker/overlay/{}/merged下的视图
4、 也就是说,后面在容器里面做的所有修改都会实时反应到目录/var/lib/docker/overlay/{}/merged。 在外面所做的修改也就实时反馈到容器里面。

// conditionalMountOnStart是容器开始调用mount期间的一个特定于平台的帮助函数。
func (daemon *Daemon) conditionalMountOnStart(container *container.Container) error {
	return daemon.Mount(container)
}
// 挂载这个容器
func (daemon *Daemon) Mount(container *container.Container) error {
	//()把该RWLayer进行挂载,然后return 路径给调用者
	dir, err := container.RWLayer.Mount(container.GetMountLabel())
	if err != nil {
		return err
	}
	logrus.Debugf("container mounted via layerStore: %v", dir)
	//容器第一次启动的时候, = ""
	if container.BaseFS != dir {
		// 图形驱动程序报告的挂载路径在Windows上应该始终是可信的,因为给定挂载层的卷路径可能会随时间变化。这应该只在非windows操作系统上是一个错误。
		if container.BaseFS != "" && runtime.GOOS != "windows" {
			daemon.Unmount(container)
			return fmt.Errorf("Error: driver %s is returning inconsistent paths for container %s ('%s' then '%s')",
				daemon.GraphDriverName(), container.ID, container.BaseFS, dir)
		}
	}
	container.BaseFS = dir 
	return nil
}
func (rl *referencedRWLayer) Mount(mountLabel string) (string, error) {
	//依靠和mountLabel获取一个container整个文件系统的路径
	return rl.layerStore.driver.Get(rl.mountedLayer.mountID, mountLabel)
}

1、 overlay

// Get函数为给定id创建并挂载所需的文件系统,并返回挂载路径。
func (d *Driver) Get(id string, mountLabel string) (s string, err error) {
	//入参id 表示一个layer所在的目录路径 /var/lib/docker/overlsy/{id}
	dir := d.dir(id)
	if _, err := os.Stat(dir); err != nil {
		return "", err
	}
	// 如果id有一个根,就返回它
	rootDir := path.Join(dir, "root")
	if _, err := os.Stat(rootDir); err == nil {
		return rootDir, nil
	}
	mergedDir := path.Join(dir, "merged")
	//增加一次一个layer的引用次数
	if count := d.ctr.Increment(mergedDir); count > 1 {
		return mergedDir, nil
	}
	defer func() {
		if err != nil {
			if c := d.ctr.Decrement(mergedDir); c <= 0 {
				syscall.Unmount(mergedDir, 0)
			}
		}
	}()
	lowerID, err := ioutil.ReadFile(path.Join(dir, "lower-id"))
	if err != nil {
		return "", err
	}
	var (
		lowerDir = path.Join(d.dir(string(lowerID)), "root")
		upperDir = path.Join(dir, "upper")
		workDir  = path.Join(dir, "work")
		opts     = fmt.Sprintf("lowerdir=%s,upperdir=%s,workdir=%s", lowerDir, upperDir, workDir)
	)
	//调用 
	if err := syscall.Mount("overlay", mergedDir, "overlay", 0, label.FormatMountLabel(opts, mountLabel)); err != nil {
		return "", fmt.Errorf("error creating overlay mount to %s: %v", mergedDir, err)
	}
	// chown "workdir/work"到重新映射的根UID/GID。在用户命名空间内覆盖fs要求将目录从下往上移动。
	rootUID, rootGID, err := idtools.GetRootUIDGID(d.uidMaps, d.gidMaps)
	if err != nil {
		return "", err
	}
	if err := os.Chown(path.Join(workDir, "work"), rootUID, rootGID); err != nil {
		return "", err
	}
	return mergedDir, nil
}

四、设置网络模式

//docker 4种网络模式:bridge hots none container
func (daemon *Daemon) initializeNetworking(container *container.Container) error {
	var err error
//如果网络模式是 container模式,获取指定容器的网络配置,join in
	if container.HostConfig.NetworkMode.IsContainer() {
		//我们需要从容器中获取要联接的主机文件
		nc, err := daemon.getNetworkedContainer(container.ID, container.HostConfig.NetworkMode.ConnectedContainer())
		if err != nil {
			return err
		}
		initializeNetworkingPaths(container, nc)
		container.Config.Hostname = nc.Config.Hostname
		container.Config.Domainname = nc.Config.Domainname
		return nil
	}
	//如果网络模式是 host模式,与宿主机共享ip仅需设置
	if container.HostConfig.NetworkMode.IsHost() {
		if container.Config.Hostname == "" {
			container.Config.Hostname, err = os.Hostname()
			if err != nil {
				return err
			}
		}
	}
	//分配网络资源
	if err := daemon.allocateNetwork(container); err != nil {
		return err
	}

	//把前面对container的hostname设置持久化,写入到/var/lib/docker/containers/{}/hostname			
	return container.BuildHostnameFile()
}

1、allocateNetwork()

//下面函数主要实现网络服务定位
func (daemon *Daemon) allocateNetwork(container *container.Container) error {
	start := time.Now()
	//daemon的网络controller,其类型是
	controller := daemon.netController
	if daemon.netController == nil {
		return nil
	}
	//清理一些残留无用的sandbox盒,sandbox就是一个容器的网络栈,相当于一个Network namespace
	if err := controller.SandboxDestroy(container.ID); err != nil {
		logrus.Errorf("failed to cleanup up stale network sandbox for container %s", container.ID)
	}

	updateSettings := false
	if len(container.NetworkSettings.Networks) == 0 {
		if container.Config.NetworkDisabled || container.HostConfig.NetworkMode.IsContainer() {
			return nil
		}
		//更新容器的网络设置,其实就是根据容器网络模式来更新的映射关系,就是网络名和endpoint的关系
		daemon.updateContainerNetworkSettings(container, nil)
		updateSettings = true
	}
	// 总是先连接默认网络,因为只有默认网络模式支持链接,我们需要对沙箱初始化链接做一些设置,但沙箱只在第一次网络连接时初始化。
	defaultNetName := runconfig.DefaultDaemonNetworkMode().NetworkName()
	if nConf, ok := container.NetworkSettings.Networks[defaultNetName]; ok {
		cleanOperationalData(nConf)
		//第一次尝试连接默认的网络名,同时可以完成沙盒的初始化
		if err := daemon.connectToNetwork(container, defaultNetName, nConf.EndpointSettings, updateSettings); err != nil {
			return err
		}
	}
	networks := make(map[string]*network.EndpointSettings)
	for n, epConf := range container.NetworkSettings.Networks {
		if n == defaultNetName {
			continue
		}
		networks[n] = epConf
	}
	for netName, epConf := range networks {
		cleanOperationalData(epConf)
		//将所有endpoint连接到网络
		if err := daemon.connectToNetwork(container, netName, epConf.EndpointSettings, updateSettings); err != nil {
			return err
		}
	}
	//持久化容器的hostconfig信息,/var/lib/docker/containers/{id}/
	if err := container.WriteHostConfig(); err != nil {
		return err
	}
	networkActions.WithValues("allocate").UpdateSince(start)
	return nil
}

五、调用containerd进行Create容器

调用libcontainerd模块

func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, attachStdio StdioCallback, options ...CreateOption) (err error) {
	clnt.lock(containerID)
	defer clnt.unlock(containerID)

	//获取libcontainerd模块中的containers,这里面的容器应该包括正在运行的容器
	if _, err := clnt.getContainer(containerID); err == nil {
		return fmt.Errorf("Container %s is already active", containerID)
	}
//获取gid和uid
	uid, gid, err := getRootIDs(specs.Spec(spec))
	if err != nil {
		return err
	}
	//迭代创建 statedir 所需目录,该目录路径在定义libcontainerd模块的时候已经指定
	dir, err := clnt.prepareBundleDir(uid, gid)
	if err != nil {
		return err
	}
//创建一个containerCommon对象
	container := clnt.newContainer(filepath.Join(dir, containerID), options...)
	if err := container.clean(); err != nil {
		return err
	}
	defer func() {
		if err != nil {
			container.clean()
			clnt.deleteContainer(containerID)
		}
	}()
//创建容器目录
	if err := idtools.MkdirAllAs(container.dir, 0700, uid, gid); err != nil && !os.IsExist(err) {
		return err
	}
//创建配置文件路径,并根据spec创建配置文件
	f, err := os.Create(filepath.Join(container.dir, configFilename))
	if err != nil {
		return err
	}
	defer f.Close()
	if err := json.NewEncoder(f).Encode(spec); err != nil {
		return err
	}
//容器启动
	return container.start(checkpoint, checkpointDir, attachStdio)
}

1、()

//func (ctr *container) start配置containerd创建容器的标准输入输出端及请求数据的设置,并发送请求
func (ctr *container) start(checkpoint string, checkpointDir string, attachStdio StdioCallback) (err error) {
	spec, err := ctr.spec() //根据读取的配置文件信息设置spec对象
	if err != nil {
		return nil
	}
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ready := make(chan struct{})
	fifoCtx, cancel := context.WithCancel(context.Background())
	defer func() {
		if err != nil {
			cancel()
		}
	}()
//创建一个先进先出的io管道对象,包括可读端和可写端
	iopipe, err := ctr.openFifos(fifoCtx, spec.Process.Terminal)
	if err != nil {
		return err
	}
	var stdinOnce sync.Once

	// 我们需要在容器启动后延迟stdin关闭,否则“stdin close”事件将被containerd拒绝。stdin闭包在attachStdio中发生
	stdin := iopipe.Stdin
	iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
		var err error
		stdinOnce.Do(func() { // 在附加错误时,我们不知道stdin是否已经关闭
			err = stdin.Close()
			go func() {
				select {
				case <-ready:
				case <-ctx.Done():
				}
				select {
				case <-ready:
					if err := ctr.sendCloseStdin(); err != nil {
						logrus.Warnf("failed to close stdin: %+v", err)
					}
				default:
				}
			}()
		})
		return err
	})
//定义对containerd模块的请求对象
	r := &containerd.CreateContainerRequest{
		Id:            ctr.containerID,
		BundlePath:    ctr.dir,
		Stdin:         ctr.fifo(syscall.Stdin),
		Stdout:        ctr.fifo(syscall.Stdout),
		Stderr:        ctr.fifo(syscall.Stderr),
		Checkpoint:    checkpoint,
		CheckpointDir: checkpointDir,
		//检查我们是否在ramdisk中运行以禁用枢轴根
		NoPivotRoot: os.Getenv("DOCKER_RAMDISK") != "",
		Runtime:     ctr.runtime,
		RuntimeArgs: ctr.runtimeArgs,
	}
	ctr.client.appendContainer(ctr)
	if err := attachStdio(*iopipe); err != nil {
		ctr.closeFifos(iopipe)
		return err
	}
//向containerd模块发送请求数据,实现容器创建grpc调用
	resp, err := ctr.client.remote.apiClient.CreateContainer(context.Background(), r)
	if err != nil {
		ctr.closeFifos(iopipe)
		return err
	}
	ctr.systemPid = systemPid(resp.Container)
	close(ready)
//启动成功后更新daemon中的容器状态
	return ctr.client.backend.StateChanged(ctr.containerID, StateInfo{
		CommonStateInfo: CommonStateInfo{
			State: StateStart,
			Pid:   ctr.systemPid,
		}})
}

2、发送grpc请求

func (c *aPIClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
	out := new(CreateContainerResponse)
	err := grpc.Invoke(ctx, "//CreateContainer", in, out, c.cc, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

// Invoke在网络上发送RPC请求,并在接收到响应后返回。
//调用由生成的代码调用。用户也可以在用例中真正需要时直接调用Invoke。
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
	if cc.dopts.unaryInt != nil {
		return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
	}
	return invoke(ctx, method, args, reply, cc, opts...)
}

func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
	//检查、设置参数
	c := defaultCallInfo
	for _, o := range opts {
		if err := o.before(&c); err != nil {
			return toRPCErr(err)
		}
	}
	defer func() {
		for _, o := range opts {
			o.after(&c)
		}
	}()
	if EnableTracing {
		c.traceInfo.tr = trace.New("."+methodFamily(method), method)
		defer c.traceInfo.tr.Finish()
		c.traceInfo.firstLine.client = true
		if deadline, ok := ctx.Deadline(); ok {
			c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
		}
		c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
		defer func() {
			if err != nil {
				c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
				c.traceInfo.tr.SetError()
			}
		}()
	}
	topts := &transport.Options{
		Last:  true,
		Delay: false,
	}
	for {
		var (
			err    error
			t      transport.ClientTransport
			stream *transport.Stream
			// 从(…)记录put处理程序。一旦RPC完成或失败,它就被调用。
			put func()
		)
		// 发送请求的目的地址:Host和路由:
		callHdr := &transport.CallHdr{
			Host:   cc.authority,
			Method: method,
		}
		if cc.dopts.cp != nil {
			callHdr.SendCompress = cc.dopts.cp.Type()
		}
		gopts := BalancerGetOptions{
			BlockingWait: !c.failFast,
		}
		t, put, err = cc.getTransport(ctx, gopts)
		if err != nil {
			//可能会重新讨论错误处理。
			if _, ok := err.(*rpcError); ok {
				return err
			}
			if err == errConnClosing || err == errConnUnavailable {
				if c.failFast {
					return Errorf(codes.Unavailable, "%v", err)
				}
				continue
			}
			//所有其他错误都被视为内部错误`	
			return Errorf(codes.Internal, "%v", err)
		}
		if c.traceInfo.tr != nil {
			c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
		}
		//发送Request
		stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)
		if err != nil {
			if put != nil {
				put()
				put = nil
			}
			//重试一个非failfast RPC时
			//1、有一个连接错误;
			//2、在启动RPC之前,服务器已经开始排空。
			if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
				if c.failFast {
					return toRPCErr(err)
				}
				continue
			}
			return toRPCErr(err)
		}
		//接受响应Response
		err = recvResponse(cc.dopts, t, &c, stream, reply)
		if err != nil {
			if put != nil {
				put()
				put = nil
			}
			if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
				if c.failFast {
					return toRPCErr(err)
				}
				continue
			}
			return toRPCErr(err)
		}
		if c.traceInfo.tr != nil {
			c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
		}
		t.CloseStream(stream, nil)
		if put != nil {
			put()
			put = nil
		}
		return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
	}
}

六、daemon启动libContainerd

通过containerdRemote可以向grpc服务器发送请求,在containerdRemote的创建过程中则启动了Containerd后台进程并对设置了监听

containerdRemote, err := libcontainerd.New(cli.getLibcontainerdRoot(), cli.getPlatformRemoteOptions()...)

1、libcontainerd

// New creates a fresh instance of libcontainerd remote.
func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
	defer func() {
		if err != nil {
			err = fmt.Errorf("Failed to connect to containerd. Please make sure containerd is installed in your PATH or you have specified the correct address. Got error: %v", err)
		}
	}()
	r := &remote{
		stateDir:    stateDir,
		daemonPid:   -1,
		eventTsPath: filepath.Join(stateDir, eventTimestampFilename),
	}
	for _, option := range options {
		if err := option.Apply(r); err != nil {
			return nil, err
		}
	}
	if err := sysinfo.MkdirAll(stateDir, 0700); err != nil {
		return nil, err
	}

	if r.rpcAddr == "" {
		//containerdSockFilename即,为常量是grpc服务器的地址,一个socker套接字
		r.rpcAddr = filepath.Join(stateDir, containerdSockFilename)
	}
//runContainerdDaemon()启动grpc服务器、对套件字进行监听
	if r.startDaemon {
		if err := r.runContainerdDaemon(); err != nil {
			return nil, err
		}
	}
	//不要输出GRPC重新连接日志
	grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
	dialOpts := append([]grpc.DialOption{grpc.WithInsecure()},
		grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
			return net.DialTimeout("unix", addr, timeout)
		}),
	)
	//调用与grpc服务器建立连接conn。
	conn, err := grpc.Dial(r.rpcAddr, dialOpts...)
	if err != nil {
		return nil, fmt.Errorf("error connecting to containerd: %v", err)
	}

	r.rpcConn = conn
	//根据该连接建立api客户端对象,通过该对象就可以通过该连接发送请求json数据
	r.apiClient = containerd.NewAPIClient(conn)
	// 获取要恢复的时间戳
	t := r.getLastEventTimestamp()
	tsp, err := ptypes.TimestampProto(t)
	if err != nil {
		logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err)
	}
	r.restoreFromTimestamp = tsp
	//连接健康检查,定时0.5s访问连接检查一次
	go r.handleConnectionChange()
	//开启异常事件监听,包括kill,pause等命令的处理
	if err := r.startEventsMonitor(); err != nil {
		return nil, err
	}
	return r, nil
}

2、runContainerdDaemon()函数

func (r *remote) runContainerdDaemon() error {
	pidFilename := filepath.Join(r.stateDir, containerdPidFilename)
	f, err := os.OpenFile(pidFilename, os.O_RDWR|os.O_CREATE, 0600)
	if err != nil {
		return err
	}
	defer f.Close()
	// 文件存在,检查守护进程是否存在
	b := make([]byte, 8)
	n, err := f.Read(b)
	if err != nil && err != io.EOF {
		return err
	}
	if n > 0 {
		pid, err := strconv.ParseUint(string(b[:n]), 10, 64)
		if err != nil {
			return err
		}
		if utils.IsProcessAlive(int(pid)) {
			logrus.Infof("libcontainerd: previous instance of containerd still alive (%d)", pid)
			r.daemonPid = int(pid)
			return nil
		}
	}
	_, err = f.Seek(0, os.SEEK_SET)
	if err != nil {
		return err
	}
	// 截断它
	err = f.Truncate(0)
	if err != nil {
		return err
	}

	// 设置docker-containerd命令执行的参数,其中-l flag指定了Containerd服务需要监听的套接字文件。
	args := []string{
		"-l", fmt.Sprintf("unix://%s", r.rpcAddr),
		"--metrics-interval=0",
		"--start-timeout", "2m",
		"--state-dir", filepath.Join(r.stateDir, containerdStateDir),
	}
	if goruntime.GOOS == "solaris" {
		args = append(args, "--shim", "containerd-shim", "--runtime", "runc")
	} else {
		args = append(args, "--shim", "docker-containerd-shim")
		if r.runtime != "" {
			args = append(args, "--runtime")
			args = append(args, r.runtime)
		}
	}
	if r.debugLog {
		args = append(args, "--debug")
	}
	if len(r.runtimeArgs) > 0 {
		for _, v := range r.runtimeArgs {
			args = append(args, "--runtime-args")
			args = append(args, v)
		}
		logrus.Debugf("libcontainerd: runContainerdDaemon: runtimeArgs: %s", args)
	}
	cmd := exec.Command(containerdBinary, args...)
	// 将containerd服务端的输入输出流信息重定向到dockerdaemon上来统一打印日志信息
	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr
	cmd.SysProcAttr = setSysProcAttr(true)
	cmd.Env = nil
	// 启动容器时从env中清除NOTIFY_SOCKET
	for _, e := range os.Environ() {
		if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
			cmd.Env = append(cmd.Env, e)
		}
	}
	//执行docker-contained
	if err := cmd.Start(); err != nil {
		return err
	}
	logrus.Infof("libcontainerd: new containerd process, pid: %d", cmd.Process.Pid)
	if err := setOOMScore(cmd.Process.Pid, r.oomScore); err != nil {
		utils.KillProcess(cmd.Process.Pid)
		return err
	}
	if _, err := f.WriteString(fmt.Sprintf("%d", cmd.Process.Pid)); err != nil {
		utils.KillProcess(cmd.Process.Pid)
		return err
	}
//开启一个goroutine等待containerd的异常信号,若接收到异常信号关闭daemonwaitch
	r.daemonWaitCh = make(chan struct{})
	go func() {
		cmd.Wait()
		close(r.daemonWaitCh)
	}() 
	r.daemonPid = cmd.Process.Pid
	return nil
}