一、postContainersStart
daemon的路由定义如下:
router.NewPostRoute("/containers/{name:.*}/start", r.postContainersStart),
查看handler函数postContainersStart得知,postContainersStart的执行步骤如下:
1、 解析request
2、调用daemon启动ContainerStart()、(vars[“name”], hostConfig, checkpoint, checkpointDir)
func (s *containerRouter) postContainersStart(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
version := httputils.VersionFromContext(ctx)
var hostConfig *container.HostConfig
//一个非nil json对象至少有7个字符。
if r.ContentLength > 7 || r.ContentLength == -1 {
if versions.GreaterThanOrEqualTo(version, "1.24") {
return validationError{fmt.Errorf("starting container with non-empty request body was deprecated since v1.10 and removed in v1.12")}
}
if err := httputils.CheckForJSON(r); err != nil {
return err
}
c, err := s.decoder.DecodeHostConfig(r.Body)
if err != nil {
return err
}
hostConfig = c
}
if err := httputils.ParseForm(r); err != nil {
return err
}
checkpoint := r.Form.Get("checkpoint")
checkpointDir := r.Form.Get("checkpoint-dir")
//参数解析完毕之后
if err := s.backend.ContainerStart(vars["name"], hostConfig, checkpoint, checkpointDir); err != nil {
return err
}
w.WriteHeader(http.StatusNoContent)
return nil
}
二、ContainerStart
1、根据名称(ID)获取容器对象
2、验证配置
3、开始启动container,(container, checkpoint, checkpointDir, true)
//ContainerStart启动容器。
func (daemon *Daemon) ContainerStart(name string, hostConfig *containertypes.HostConfig, checkpoint string, checkpointDir string) error {
if checkpoint != "" && !daemon.HasExperimental() {
//checkpoint 仅仅支持在experimental mode下使用
return apierrors.NewBadRequestError(fmt.Errorf("checkpoint is only supported in experimental mode"))
}
//根据名称(ID)获取容器对象
container, err := daemon.GetContainer(name)
if err != nil {
return err
}
if container.IsPaused() {
//paused状态
return fmt.Errorf("Cannot start a paused container, try unpause instead.")
}
if container.IsRunning() {
//running状态
err := fmt.Errorf("Container already started")
return apierrors.NewErrorWithStatusCode(err, http.StatusNotModified)
}
// Windows在这里没有向后兼容性问题。非Windows系统中,为了保持兼容性,hostconfig应该在create的时候被传给该container,而不是在start期间
if runtime.GOOS != "windows" {
// 这是为了向后兼容——hostconfig应该在创建容器时传递,而不是在开始时。
if hostConfig != nil {
logrus.Warn("DEPRECATED: Setting host configuration options when the container starts is deprecated and has been removed in Docker 1.12")
oldNetworkMode := container.HostConfig.NetworkMode
if err := daemon.setSecurityOptions(container, hostConfig); err != nil {
return err
}
if err := daemon.mergeAndVerifyLogConfig(&hostConfig.LogConfig); err != nil {
return err
}
if err := daemon.setHostConfig(container, hostConfig); err != nil {
return err
}
newNetworkMode := container.HostConfig.NetworkMode
if string(oldNetworkMode) != string(newNetworkMode) {
// 如果用户在启动时改变了网络模式,请清理旧网络。这是一个已弃用的特性,已在Docker 1.12中删除
container.NetworkSettings.Networks = nil
if err := container.ToDisk(); err != nil {
return err
}
}
container.InitDNSHostConfig()
}
} else {
if hostConfig != nil {
return fmt.Errorf("Supplying a hostconfig on start is not supported. It should be supplied on create")
}
}
// 检查hostConfig是否与当前系统设置一致。它可能发生cgroups被umounted或类似的情况。
if _, err = daemon.verifyContainerSettings(container.HostConfig, nil, false); err != nil {
return err
}
// 适应旧的容器,以防我们在这个函数中有更新,旧的容器永远没有机会在创建阶段调用新函数。
if hostConfig != nil {
if err := daemon.adaptContainerSettings(container.HostConfig, false); err != nil {
return err
}
}
//开始启动container
return daemon.containerStart(container, checkpoint, checkpointDir, true)
}
1、关键函数containerStart
()函数是docker start
中的核心部分,其工作流程如下:
1、设置容器的文件系统,/var/lib/docker/overlay/{}/merged
2、 设置容器的网络模式,调用了libnetwork,即CNM模型
3、 创建/proc /dev等spec文件,对容器所特有的属性都进行设置,例如:资源限制,命名空间,安全模式等等配置信息
4、 调用containerd进行Create容器
// containerStart通过设置容器所需的一切,比如存储和网络,以及容器之间的链接,来准备容器运行。容器将等待开始运行的信号。
func (daemon *Daemon) containerStart(container *container.Container, checkpoint string, checkpointDir string, resetRestartManager bool) (err error) {
start := time.Now()
//给准备启动的container上锁
container.Lock()
defer container.Unlock()
if resetRestartManager && container.Running { // 如果已经在重启步骤中,则跳过此检查,并且resetRestartManager==false
return nil
}
if container.RemovalInProgress || container.Dead {
return fmt.Errorf("Container is marked for removal and cannot be started.")
}
// 启动失败之后的clean up操作
defer func() {
if err != nil {
container.SetError(err)
// 如果没有其他人设置它,确保我们不会让它为零
if container.ExitCode() == 0 {
container.SetExitCode(128)
}
container.ToDisk()
container.Reset(false)
daemon.Cleanup(container)
// 如果容器设置了AutoRemove标志,在清理后删除它
if container.HostConfig.AutoRemove {
container.Unlock()
if err := daemon.ContainerRm(container.ID, &types.ContainerRmConfig{ForceRemove: true, RemoveVolume: true}); err != nil {
logrus.Errorf("can't remove container %s: %v", container.ID, err)
}
container.Lock()
}
}
}()
//设置container的BaseFS为该container的RWLayer的挂载点path
if err := daemon.conditionalMountOnStart(container); err != nil {
return err
}
// 确保NetworkMode有一个可接受的值。我们这样做是为了确保API向后兼容性。
container.HostConfig = runconfig.SetDefaultNetModeIfBlank(container.HostConfig)
//初始化容器的网络,默认模式bridge:同一个host主机上容器的通信通过Linux bridge进行与宿主机外部网络的通信需要通过宿主机端口进行NAT
if err := daemon.initializeNetworking(container); err != nil {
return err
}
//对容器所特有的属性都进行设置,例如:资源限制,命名空间,安全模式等等配置信息
spec, err := daemon.createSpec(container)
if err != nil {
return err
}
createOptions, err := daemon.getLibcontainerdCreateOptions(container)
if err != nil {
return err
}
if resetRestartManager {
container.ResetRestartManager(true)
}
if checkpointDir == "" {
checkpointDir = container.CheckpointDir()
}
//前面所有的参数等设置都是为了这里调用containerd服务
if err := daemon.containerd.Create(container.ID, checkpoint, checkpointDir, *spec, container.InitializeStdio, createOptions...); err != nil {
errDesc := grpc.ErrorDesc(err)
contains := func(s1, s2 string) bool {
return strings.Contains(strings.ToLower(s1), s2)
}
logrus.Errorf("Create container failed with error: %s", errDesc)
// 如果我们从容器的初始开始接收到一个内部错误,那么让我们返回它,而不是输入重新启动循环设置为127,因为容器CMD不存在
if contains(errDesc, container.Path) &&
(contains(errDesc, "executable file not found") ||
contains(errDesc, "no such file or directory") ||
contains(errDesc, "system cannot find the file specified")) {
container.SetExitCode(127)
}
if contains(errDesc, syscall.EACCES.Error()) {
container.SetExitCode(126)
}
// 试图将一个文件挂载到一个目录上,或将一个目录挂载到一个文件上,可能是从用户指定的绑定挂载
if contains(errDesc, syscall.ENOTDIR.Error()) {
errDesc += ": Are you trying to mount a directory onto a file (or vice-versa)? Check if the specified host path exists and is the expected type"
container.SetExitCode(127)
}
return fmt.Errorf("%s", errDesc)
}
containerActions.WithValues("start").UpdateSince(start)
return nil
}
三、设置文件系统
1、设置container的BaseFS为该container的RWLayer的挂载点path:/var/lib/docker/overlay/{}/merged
。
2、BaseFS 是该container整个文件系统的路径。
3、 docker exec -it {} bin/bash
进去看到的视图,就是/var/lib/docker/overlay/{}/merged下的视图
4、 也就是说,后面在容器里面做的所有修改都会实时反应到目录/var/lib/docker/overlay/{}/merged。 在外面所做的修改也就实时反馈到容器里面。
// conditionalMountOnStart是容器开始调用mount期间的一个特定于平台的帮助函数。
func (daemon *Daemon) conditionalMountOnStart(container *container.Container) error {
return daemon.Mount(container)
}
// 挂载这个容器
func (daemon *Daemon) Mount(container *container.Container) error {
//()把该RWLayer进行挂载,然后return 路径给调用者
dir, err := container.RWLayer.Mount(container.GetMountLabel())
if err != nil {
return err
}
logrus.Debugf("container mounted via layerStore: %v", dir)
//容器第一次启动的时候, = ""
if container.BaseFS != dir {
// 图形驱动程序报告的挂载路径在Windows上应该始终是可信的,因为给定挂载层的卷路径可能会随时间变化。这应该只在非windows操作系统上是一个错误。
if container.BaseFS != "" && runtime.GOOS != "windows" {
daemon.Unmount(container)
return fmt.Errorf("Error: driver %s is returning inconsistent paths for container %s ('%s' then '%s')",
daemon.GraphDriverName(), container.ID, container.BaseFS, dir)
}
}
container.BaseFS = dir
return nil
}
func (rl *referencedRWLayer) Mount(mountLabel string) (string, error) {
//依靠和mountLabel获取一个container整个文件系统的路径
return rl.layerStore.driver.Get(rl.mountedLayer.mountID, mountLabel)
}
1、 overlay
// Get函数为给定id创建并挂载所需的文件系统,并返回挂载路径。
func (d *Driver) Get(id string, mountLabel string) (s string, err error) {
//入参id 表示一个layer所在的目录路径 /var/lib/docker/overlsy/{id}
dir := d.dir(id)
if _, err := os.Stat(dir); err != nil {
return "", err
}
// 如果id有一个根,就返回它
rootDir := path.Join(dir, "root")
if _, err := os.Stat(rootDir); err == nil {
return rootDir, nil
}
mergedDir := path.Join(dir, "merged")
//增加一次一个layer的引用次数
if count := d.ctr.Increment(mergedDir); count > 1 {
return mergedDir, nil
}
defer func() {
if err != nil {
if c := d.ctr.Decrement(mergedDir); c <= 0 {
syscall.Unmount(mergedDir, 0)
}
}
}()
lowerID, err := ioutil.ReadFile(path.Join(dir, "lower-id"))
if err != nil {
return "", err
}
var (
lowerDir = path.Join(d.dir(string(lowerID)), "root")
upperDir = path.Join(dir, "upper")
workDir = path.Join(dir, "work")
opts = fmt.Sprintf("lowerdir=%s,upperdir=%s,workdir=%s", lowerDir, upperDir, workDir)
)
//调用
if err := syscall.Mount("overlay", mergedDir, "overlay", 0, label.FormatMountLabel(opts, mountLabel)); err != nil {
return "", fmt.Errorf("error creating overlay mount to %s: %v", mergedDir, err)
}
// chown "workdir/work"到重新映射的根UID/GID。在用户命名空间内覆盖fs要求将目录从下往上移动。
rootUID, rootGID, err := idtools.GetRootUIDGID(d.uidMaps, d.gidMaps)
if err != nil {
return "", err
}
if err := os.Chown(path.Join(workDir, "work"), rootUID, rootGID); err != nil {
return "", err
}
return mergedDir, nil
}
四、设置网络模式
//docker 4种网络模式:bridge hots none container
func (daemon *Daemon) initializeNetworking(container *container.Container) error {
var err error
//如果网络模式是 container模式,获取指定容器的网络配置,join in
if container.HostConfig.NetworkMode.IsContainer() {
//我们需要从容器中获取要联接的主机文件
nc, err := daemon.getNetworkedContainer(container.ID, container.HostConfig.NetworkMode.ConnectedContainer())
if err != nil {
return err
}
initializeNetworkingPaths(container, nc)
container.Config.Hostname = nc.Config.Hostname
container.Config.Domainname = nc.Config.Domainname
return nil
}
//如果网络模式是 host模式,与宿主机共享ip仅需设置
if container.HostConfig.NetworkMode.IsHost() {
if container.Config.Hostname == "" {
container.Config.Hostname, err = os.Hostname()
if err != nil {
return err
}
}
}
//分配网络资源
if err := daemon.allocateNetwork(container); err != nil {
return err
}
//把前面对container的hostname设置持久化,写入到/var/lib/docker/containers/{}/hostname
return container.BuildHostnameFile()
}
1、allocateNetwork()
//下面函数主要实现网络服务定位
func (daemon *Daemon) allocateNetwork(container *container.Container) error {
start := time.Now()
//daemon的网络controller,其类型是
controller := daemon.netController
if daemon.netController == nil {
return nil
}
//清理一些残留无用的sandbox盒,sandbox就是一个容器的网络栈,相当于一个Network namespace
if err := controller.SandboxDestroy(container.ID); err != nil {
logrus.Errorf("failed to cleanup up stale network sandbox for container %s", container.ID)
}
updateSettings := false
if len(container.NetworkSettings.Networks) == 0 {
if container.Config.NetworkDisabled || container.HostConfig.NetworkMode.IsContainer() {
return nil
}
//更新容器的网络设置,其实就是根据容器网络模式来更新的映射关系,就是网络名和endpoint的关系
daemon.updateContainerNetworkSettings(container, nil)
updateSettings = true
}
// 总是先连接默认网络,因为只有默认网络模式支持链接,我们需要对沙箱初始化链接做一些设置,但沙箱只在第一次网络连接时初始化。
defaultNetName := runconfig.DefaultDaemonNetworkMode().NetworkName()
if nConf, ok := container.NetworkSettings.Networks[defaultNetName]; ok {
cleanOperationalData(nConf)
//第一次尝试连接默认的网络名,同时可以完成沙盒的初始化
if err := daemon.connectToNetwork(container, defaultNetName, nConf.EndpointSettings, updateSettings); err != nil {
return err
}
}
networks := make(map[string]*network.EndpointSettings)
for n, epConf := range container.NetworkSettings.Networks {
if n == defaultNetName {
continue
}
networks[n] = epConf
}
for netName, epConf := range networks {
cleanOperationalData(epConf)
//将所有endpoint连接到网络
if err := daemon.connectToNetwork(container, netName, epConf.EndpointSettings, updateSettings); err != nil {
return err
}
}
//持久化容器的hostconfig信息,/var/lib/docker/containers/{id}/
if err := container.WriteHostConfig(); err != nil {
return err
}
networkActions.WithValues("allocate").UpdateSince(start)
return nil
}
五、调用containerd进行Create容器
调用libcontainerd模块
func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, attachStdio StdioCallback, options ...CreateOption) (err error) {
clnt.lock(containerID)
defer clnt.unlock(containerID)
//获取libcontainerd模块中的containers,这里面的容器应该包括正在运行的容器
if _, err := clnt.getContainer(containerID); err == nil {
return fmt.Errorf("Container %s is already active", containerID)
}
//获取gid和uid
uid, gid, err := getRootIDs(specs.Spec(spec))
if err != nil {
return err
}
//迭代创建 statedir 所需目录,该目录路径在定义libcontainerd模块的时候已经指定
dir, err := clnt.prepareBundleDir(uid, gid)
if err != nil {
return err
}
//创建一个containerCommon对象
container := clnt.newContainer(filepath.Join(dir, containerID), options...)
if err := container.clean(); err != nil {
return err
}
defer func() {
if err != nil {
container.clean()
clnt.deleteContainer(containerID)
}
}()
//创建容器目录
if err := idtools.MkdirAllAs(container.dir, 0700, uid, gid); err != nil && !os.IsExist(err) {
return err
}
//创建配置文件路径,并根据spec创建配置文件
f, err := os.Create(filepath.Join(container.dir, configFilename))
if err != nil {
return err
}
defer f.Close()
if err := json.NewEncoder(f).Encode(spec); err != nil {
return err
}
//容器启动
return container.start(checkpoint, checkpointDir, attachStdio)
}
1、()
//func (ctr *container) start配置containerd创建容器的标准输入输出端及请求数据的设置,并发送请求
func (ctr *container) start(checkpoint string, checkpointDir string, attachStdio StdioCallback) (err error) {
spec, err := ctr.spec() //根据读取的配置文件信息设置spec对象
if err != nil {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ready := make(chan struct{})
fifoCtx, cancel := context.WithCancel(context.Background())
defer func() {
if err != nil {
cancel()
}
}()
//创建一个先进先出的io管道对象,包括可读端和可写端
iopipe, err := ctr.openFifos(fifoCtx, spec.Process.Terminal)
if err != nil {
return err
}
var stdinOnce sync.Once
// 我们需要在容器启动后延迟stdin关闭,否则“stdin close”事件将被containerd拒绝。stdin闭包在attachStdio中发生
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
var err error
stdinOnce.Do(func() { // 在附加错误时,我们不知道stdin是否已经关闭
err = stdin.Close()
go func() {
select {
case <-ready:
case <-ctx.Done():
}
select {
case <-ready:
if err := ctr.sendCloseStdin(); err != nil {
logrus.Warnf("failed to close stdin: %+v", err)
}
default:
}
}()
})
return err
})
//定义对containerd模块的请求对象
r := &containerd.CreateContainerRequest{
Id: ctr.containerID,
BundlePath: ctr.dir,
Stdin: ctr.fifo(syscall.Stdin),
Stdout: ctr.fifo(syscall.Stdout),
Stderr: ctr.fifo(syscall.Stderr),
Checkpoint: checkpoint,
CheckpointDir: checkpointDir,
//检查我们是否在ramdisk中运行以禁用枢轴根
NoPivotRoot: os.Getenv("DOCKER_RAMDISK") != "",
Runtime: ctr.runtime,
RuntimeArgs: ctr.runtimeArgs,
}
ctr.client.appendContainer(ctr)
if err := attachStdio(*iopipe); err != nil {
ctr.closeFifos(iopipe)
return err
}
//向containerd模块发送请求数据,实现容器创建grpc调用
resp, err := ctr.client.remote.apiClient.CreateContainer(context.Background(), r)
if err != nil {
ctr.closeFifos(iopipe)
return err
}
ctr.systemPid = systemPid(resp.Container)
close(ready)
//启动成功后更新daemon中的容器状态
return ctr.client.backend.StateChanged(ctr.containerID, StateInfo{
CommonStateInfo: CommonStateInfo{
State: StateStart,
Pid: ctr.systemPid,
}})
}
2、发送grpc请求
func (c *aPIClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
out := new(CreateContainerResponse)
err := grpc.Invoke(ctx, "//CreateContainer", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Invoke在网络上发送RPC请求,并在接收到响应后返回。
//调用由生成的代码调用。用户也可以在用例中真正需要时直接调用Invoke。
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
//检查、设置参数
c := defaultCallInfo
for _, o := range opts {
if err := o.before(&c); err != nil {
return toRPCErr(err)
}
}
defer func() {
for _, o := range opts {
o.after(&c)
}
}()
if EnableTracing {
c.traceInfo.tr = trace.New("."+methodFamily(method), method)
defer c.traceInfo.tr.Finish()
c.traceInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
}
c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
defer func() {
if err != nil {
c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
c.traceInfo.tr.SetError()
}
}()
}
topts := &transport.Options{
Last: true,
Delay: false,
}
for {
var (
err error
t transport.ClientTransport
stream *transport.Stream
// 从(…)记录put处理程序。一旦RPC完成或失败,它就被调用。
put func()
)
// 发送请求的目的地址:Host和路由:
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
}
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
//可能会重新讨论错误处理。
if _, ok := err.(*rpcError); ok {
return err
}
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return Errorf(codes.Unavailable, "%v", err)
}
continue
}
//所有其他错误都被视为内部错误`
return Errorf(codes.Internal, "%v", err)
}
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
//发送Request
stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)
if err != nil {
if put != nil {
put()
put = nil
}
//重试一个非failfast RPC时
//1、有一个连接错误;
//2、在启动RPC之前,服务器已经开始排空。
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
continue
}
return toRPCErr(err)
}
//接受响应Response
err = recvResponse(cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
continue
}
return toRPCErr(err)
}
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
}
t.CloseStream(stream, nil)
if put != nil {
put()
put = nil
}
return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
}
}
六、daemon启动libContainerd
通过containerdRemote可以向grpc服务器发送请求,在containerdRemote的创建过程中则启动了Containerd后台进程并对设置了监听
containerdRemote, err := libcontainerd.New(cli.getLibcontainerdRoot(), cli.getPlatformRemoteOptions()...)
1、libcontainerd
// New creates a fresh instance of libcontainerd remote.
func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("Failed to connect to containerd. Please make sure containerd is installed in your PATH or you have specified the correct address. Got error: %v", err)
}
}()
r := &remote{
stateDir: stateDir,
daemonPid: -1,
eventTsPath: filepath.Join(stateDir, eventTimestampFilename),
}
for _, option := range options {
if err := option.Apply(r); err != nil {
return nil, err
}
}
if err := sysinfo.MkdirAll(stateDir, 0700); err != nil {
return nil, err
}
if r.rpcAddr == "" {
//containerdSockFilename即,为常量是grpc服务器的地址,一个socker套接字
r.rpcAddr = filepath.Join(stateDir, containerdSockFilename)
}
//runContainerdDaemon()启动grpc服务器、对套件字进行监听
if r.startDaemon {
if err := r.runContainerdDaemon(); err != nil {
return nil, err
}
}
//不要输出GRPC重新连接日志
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
dialOpts := append([]grpc.DialOption{grpc.WithInsecure()},
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
//调用与grpc服务器建立连接conn。
conn, err := grpc.Dial(r.rpcAddr, dialOpts...)
if err != nil {
return nil, fmt.Errorf("error connecting to containerd: %v", err)
}
r.rpcConn = conn
//根据该连接建立api客户端对象,通过该对象就可以通过该连接发送请求json数据
r.apiClient = containerd.NewAPIClient(conn)
// 获取要恢复的时间戳
t := r.getLastEventTimestamp()
tsp, err := ptypes.TimestampProto(t)
if err != nil {
logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err)
}
r.restoreFromTimestamp = tsp
//连接健康检查,定时0.5s访问连接检查一次
go r.handleConnectionChange()
//开启异常事件监听,包括kill,pause等命令的处理
if err := r.startEventsMonitor(); err != nil {
return nil, err
}
return r, nil
}
2、runContainerdDaemon()函数
func (r *remote) runContainerdDaemon() error {
pidFilename := filepath.Join(r.stateDir, containerdPidFilename)
f, err := os.OpenFile(pidFilename, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
defer f.Close()
// 文件存在,检查守护进程是否存在
b := make([]byte, 8)
n, err := f.Read(b)
if err != nil && err != io.EOF {
return err
}
if n > 0 {
pid, err := strconv.ParseUint(string(b[:n]), 10, 64)
if err != nil {
return err
}
if utils.IsProcessAlive(int(pid)) {
logrus.Infof("libcontainerd: previous instance of containerd still alive (%d)", pid)
r.daemonPid = int(pid)
return nil
}
}
_, err = f.Seek(0, os.SEEK_SET)
if err != nil {
return err
}
// 截断它
err = f.Truncate(0)
if err != nil {
return err
}
// 设置docker-containerd命令执行的参数,其中-l flag指定了Containerd服务需要监听的套接字文件。
args := []string{
"-l", fmt.Sprintf("unix://%s", r.rpcAddr),
"--metrics-interval=0",
"--start-timeout", "2m",
"--state-dir", filepath.Join(r.stateDir, containerdStateDir),
}
if goruntime.GOOS == "solaris" {
args = append(args, "--shim", "containerd-shim", "--runtime", "runc")
} else {
args = append(args, "--shim", "docker-containerd-shim")
if r.runtime != "" {
args = append(args, "--runtime")
args = append(args, r.runtime)
}
}
if r.debugLog {
args = append(args, "--debug")
}
if len(r.runtimeArgs) > 0 {
for _, v := range r.runtimeArgs {
args = append(args, "--runtime-args")
args = append(args, v)
}
logrus.Debugf("libcontainerd: runContainerdDaemon: runtimeArgs: %s", args)
}
cmd := exec.Command(containerdBinary, args...)
// 将containerd服务端的输入输出流信息重定向到dockerdaemon上来统一打印日志信息
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.SysProcAttr = setSysProcAttr(true)
cmd.Env = nil
// 启动容器时从env中清除NOTIFY_SOCKET
for _, e := range os.Environ() {
if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
cmd.Env = append(cmd.Env, e)
}
}
//执行docker-contained
if err := cmd.Start(); err != nil {
return err
}
logrus.Infof("libcontainerd: new containerd process, pid: %d", cmd.Process.Pid)
if err := setOOMScore(cmd.Process.Pid, r.oomScore); err != nil {
utils.KillProcess(cmd.Process.Pid)
return err
}
if _, err := f.WriteString(fmt.Sprintf("%d", cmd.Process.Pid)); err != nil {
utils.KillProcess(cmd.Process.Pid)
return err
}
//开启一个goroutine等待containerd的异常信号,若接收到异常信号关闭daemonwaitch
r.daemonWaitCh = make(chan struct{})
go func() {
cmd.Wait()
close(r.daemonWaitCh)
}()
r.daemonPid = cmd.Process.Pid
return nil
}