Docker源码学习 五

这里来看下docker run的代码实现过程。对于client来说run命令会调用CmdRun,CmdRun中的重要代码有:

    ......
    createResponse, err := cli.createContainer(config, hostConfig, hostConfig.ContainerIDFile, *flName)
    ......
    _, _, err = readBody(cli.call("POST", "/containers/"+createResponse.ID+"/start", nil, nil)); err != nil
    ......

cli.createContainer的实现主要是调用了下面的URL:

    serverResp, err := cli.call("POST", "/containers/create?"+containerValues.Encode(), mergedConfig, nil)

所以我们下面主要分析/containers/create以及/containers/{createResponse.ID}/start。

这两个URL对应的函数为:

    "/containers/create":            s.postContainersCreate,
    "/containers/{name:.*}/start":   s.postContainersStart,

我们先来看s.postContainersCreate。postContainersCreate中最核心的代码为:

    containerId, warnings, err := s.daemon.ContainerCreate(name, config, hostConfig)
    if err != nil {
        return err
    }

这里调用了我们daemon的ContainerCreate方法,其做了参数相关的检查后会调用Create方法,Create方法如下:

// Create creates a new container from the given configuration with a given name.
func (daemon *Daemon) Create(config *runconfig.Config, hostConfig *runconfig.HostConfig, name string) (*Container, []string, error) {
    var (
        container *Container
        warnings  []string
        img       *graph.Image
        imgID     string
        err       error
    )

    if config.Image != "" {
        img, err = daemon.repositories.LookupImage(config.Image)
        if err != nil {
            return nil, nil, err
        }
        if err = daemon.graph.CheckDepth(img); err != nil {
            return nil, nil, err
        }
        imgID = img.ID
    }

首先是获取我们的镜像,接着:

    if err := daemon.mergeAndVerifyConfig(config, img); err != nil {
        return nil, nil, err
    }
    if !config.NetworkDisabled && daemon.SystemConfig().IPv4ForwardingDisabled {
        warnings = append(warnings, "IPv4 forwarding is disabled.")
    }
    if hostConfig == nil {
        hostConfig = &runconfig.HostConfig{}
    }
    if hostConfig.SecurityOpt == nil {
        hostConfig.SecurityOpt, err = daemon.GenerateSecurityOpt(hostConfig.IpcMode, hostConfig.PidMode)
        if err != nil {
            return nil, nil, err
        }
    }

这里会对配置文件以及系统参数做一些检查。接着:

    if container, err = daemon.newContainer(name, config, imgID); err != nil {
        return nil, nil, err
    }

这里建立了我们的容器,实现为:

func (daemon *Daemon) newContainer(name string, config *runconfig.Config, imgID string) (*Container, error) {
    var (
        id  string
        err error
    )   
    id, name, err = daemon.generateIdAndName(name)
    if err != nil {
        return nil, err 
    }   

    daemon.generateHostname(id, config)
    entrypoint, args := daemon.getEntrypointAndArgs(config.Entrypoint, config.Cmd)

    base := daemon.newBaseContainer(id)
    base.Created = time.Now().UTC()
    base.Path = entrypoint
    base.Args = args //FIXME: de-duplicate from config
    base.Config = config
    base.hostConfig = &runconfig.HostConfig{}
    base.ImageID = imgID
    base.NetworkSettings = &network.Settings{}
    base.Name = name
    base.Driver = daemon.driver.String()
    base.ExecDriver = daemon.execDriver.Name()

    container := &Container{
        CommonContainer: base,
    }   

    return container, err 
}

主要的代码为daemon.newBaseContainer(id)获取一个base,然后赋予相关的属性。来看下newBaseContainer:

func (daemon *Daemon) newBaseContainer(id string) CommonContainer {
    return CommonContainer{
        ID:           id,
        State:        NewState(),
        MountPoints:  make(map[string]*mountPoint),
        Volumes:      make(map[string]string),
        VolumesRW:    make(map[string]bool),
        execCommands: newExecStore(),
        root:         daemon.containerRoot(id),
    }
}

可以看到其只是建立了一个结构体。我们继续看代码:

    if err := daemon.Register(container); err != nil {
        return nil, nil, err
    }

这里是注册了我们的容器。注册其实就是在一个map中记录一下信息。我们继续看代码:

    if err := daemon.createRootfs(container); err != nil {
        return nil, nil, err
    }

这里创建了Rootfs,也就是我们容器的文件系统,在我们这里是基于aufs的。我们看下其实现:

func (daemon *Daemon) createRootfs(container *Container) error {
    // Step 1: create the container directory.
    // This doubles as a barrier to avoid race conditions.
    if err := os.Mkdir(container.root, 0700); err != nil {
        return err 
    }   
    initID := fmt.Sprintf("%s-init", container.ID)
    if err := daemon.driver.Create(initID, container.ImageID); err != nil {
        return err 
    }   
    initPath, err := daemon.driver.Get(initID, "") 
    if err != nil {
        return err 
    }   
    defer daemon.driver.Put(initID)

    if err := setupInitLayer(initPath); err != nil {
        return err 
    }   

    if err := daemon.driver.Create(container.ID, initID); err != nil {
        return err 
    }   
    return nil 
}

可以看到这里的事情都是给我们的driver去做的,daemon的driver就是我们的graphdriver,我们之前的文章中分析过这个driver的初始化。这里可以看成就是按照aufs的要求建立对应的文件系统。我们继续看代码:

    if err := daemon.setHostConfig(container, hostConfig); err != nil {
        return nil, nil, err
    }
    if err := container.Mount(); err != nil {
        return nil, nil, err
    }
    defer container.Unmount()

我们来看Mount和Unmount,Mount实际上是调用了graphdriver的Get函数,而Unmount则是调用了graphdriver的Put。我们来看aufs的Get的实现:

// Return the rootfs path for the id
// This will mount the dir at it's given path
func (a *Driver) Get(id, mountLabel string) (string, error) {
    ids, err := getParentIds(a.rootPath(), id) 
    if err != nil {
        if !os.IsNotExist(err) {
            return "", err 
        }
        ids = []string{}
    }   

    // Protect the a.active from concurrent access
    a.Lock()
    defer a.Unlock()

    count := a.active[id]

    // If a dir does not have a parent ( no layers )do not try to mount
    // just return the diff path to the data
    out := path.Join(a.rootPath(), "diff", id) 
    if len(ids) > 0 { 
        out = path.Join(a.rootPath(), "mnt", id)

        if count == 0 {
            if err := a.mount(id, mountLabel); err != nil {
                return "", err
            }
        }
    }

    a.active[id] = count + 1

    return out, nil
}

可以看到这里做了两件事情,第一件事情是获取id,第二件事情是mount我们的文件系统。Create的剩下的代码基本是关于Volume的,我们就跳过不看了。目前我们拥有了一个base的结构体包含了这个容器的所有信息,并且有一个aufs的文件系统根目录。

我们来看s.postContainersStart,其核心代码为:

    if err := s.daemon.ContainerStart(vars["name"], hostConfig); err != nil {
        if err.Error() == "Container already started" {
            w.WriteHeader(http.StatusNotModified)
            return nil
        }
        return err
    }

ContainerStart的核心代码为:

    if err := container.Start(); err != nil {
        return fmt.Errorf("Cannot start container %s: %s", name, err)
    }  

我们来看Start的关键方法:

    if err := container.Mount(); err != nil {
        return err
    }

    // No-op if non-Windows. Once the container filesystem is mounted,
    // prepare the layer to boot using the Windows driver.
    if err := container.PrepareStorage(); err != nil {
        return err
    }

    if err := container.initializeNetworking(); err != nil {
        return err
    }
    linkedEnv, err := container.setupLinkedContainers()
    if err != nil {
        return err
    }
    if err := container.setupWorkingDirectory(); err != nil {
        return err
    }
    env := container.createDaemonEnvironment(linkedEnv)
    if err := populateCommand(container, env); err != nil {
        return err
    }    

    mounts, err := container.setupMounts()
    if err != nil {
        return err
    }

    container.command.Mounts = mounts
    return container.waitForStart()

container.Mount()我们已经看过,initializeNetworking用于网络环境的初始化,我们看下initializeNetworking:

    if err := container.AllocateNetwork(); err != nil {
        return err
    }
    ......
    if err := container.configureNetwork(networkName, service, networkDriver, mode.IsDefault()); err != nil {
        return err
    }
    ......
    func (container *Container) configureNetwork(networkName, service, networkDriver string, canCreateNetwork bool) error {
    controller := container.daemon.netController
    n, err := controller.NetworkByName(networkName)
    if err != nil {
        if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok || !canCreateNetwork {
            return err
        }

        if n, err = createNetwork(controller, networkName, networkDriver); err != nil {
            return err
        }
    }

可以看到网络其实最终是调用container.daemon.netController来帮助建立的,也就是通过libnetwork,我们以后会专门研究它。我们接着看container.setupWorkingDirectory:

func (container *Container) setupWorkingDirectory() error {
    if container.Config.WorkingDir != "" { 
        container.Config.WorkingDir = filepath.Clean(container.Config.WorkingDir)

        pth, err := container.GetResourcePath(container.Config.WorkingDir)
        if err != nil {
            return err
        }

        pthInfo, err := os.Stat(pth)
        if err != nil {
            if !os.IsNotExist(err) {
                return err
            }

            if err := system.MkdirAll(pth, 0755); err != nil {
                return err
            }
        }
        if pthInfo != nil && !pthInfo.IsDir() {
            return fmt.Errorf("Cannot mkdir: %s is not a directory", container.Config.WorkingDir)
        }
    }
    return nil
}

其主要做了目录的创建工作。我们继续看代码,来看container.createDaemonEnvironment:

func (container *Container) createDaemonEnvironment(linkedEnv []string) []string {
    // if a domain name was specified, append it to the hostname (see #7851)
    fullHostname := container.Config.Hostname
    if container.Config.Domainname != "" {
        fullHostname = fmt.Sprintf("%s.%s", fullHostname, container.Config.Domainname)
    }
    // Setup environment
    env := []string{
        "PATH=" + DefaultPathEnv,
        "HOSTNAME=" + fullHostname,
        // Note: we don't set HOME here because it'll get autoset intelligently
        // based on the value of USER inside dockerinit, but only if it isn't
        // set already (ie, that can be overridden by setting HOME via -e or ENV
        // in a Dockerfile).
    }
    if container.Config.Tty {
        env = append(env, "TERM=xterm")
    }
    env = append(env, linkedEnv...)
    // because the env on the container can override certain default values
    // we need to replace the 'env' keys where they match and append anything
    // else.
    env = utils.ReplaceOrAppendEnvValues(env, container.Config.Env)

    return env
}

这里获取了一个env结构体。继续看代码container.setupMounts:

func (container *Container) setupMounts() ([]execdriver.Mount, error) {
    var mounts []execdriver.Mount
    for _, m := range container.MountPoints {
        path, err := m.Setup()
        if err != nil {
            return nil, err 
        }

        mounts = append(mounts, execdriver.Mount{
            Source:      path,
            Destination: m.Destination,
            Writable:    m.RW,
        })
    }   

    mounts = sortMounts(mounts)
    return append(mounts, container.networkMounts()...), nil 
}

这里涉及到volumen的,我们就不看了。最后我们来看:

container.waitForStart()

其实现为:

func (container *Container) waitForStart() error {
    container.monitor = newContainerMonitor(container, container.hostConfig.RestartPolicy)

    // block until we either receive an error from the initial start of the container's
    // process or until the process is running in the container
    select {
    case <-container.monitor.startSignal:
    case err := <-promise.Go(container.monitor.Start):
        return err
    }    

    return nil
}

newContainerMonitor为:

// newContainerMonitor returns an initialized containerMonitor for the provided container
// honoring the provided restart policy
func newContainerMonitor(container *Container, policy runconfig.RestartPolicy) *containerMonitor {
    return &containerMonitor{
        container:     container,
        restartPolicy: policy,
        timeIncrement: defaultTimeIncrement,
        stopChan:      make(chan struct{}),
        startSignal:   make(chan struct{}),
    }
}

Start由promise.Go负责调用,最终会执行container.monitor.Start,核心代码为:

        if exitStatus, err = m.container.daemon.Run(m.container, pipes, m.callback); err != nil {
            // if we receive an internal error from the initial start of a container then lets
            // return it instead of entering the restart loop
            if m.container.RestartCount == 0 {
                m.container.ExitCode = -1
                m.resetContainer(false)

                return err
            }

            logrus.Errorf("Error running container: %s", err)
        }

Run就是由我们的execdriver去实现了:

func (daemon *Daemon) Run(c *Container, pipes *execdriver.Pipes, startCallback execdriver.StartCallback) (execdriver.ExitStatus, error) {
    return daemon.execDriver.Run(c.command, pipes, startCallback)
}

对于native的driver,调用了libcontainer:

    p := &libcontainer.Process{
        Args: append([]string{c.ProcessConfig.Entrypoint}, c.ProcessConfig.Arguments...),
        Env:  c.ProcessConfig.Env,
        Cwd:  c.WorkingDir,
        User: c.ProcessConfig.User,
    }

libcontainer和libnetwork一样,我们以后会单独分析。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*