Docker源码学习 四

这里分析一下执行docker pull命令的时候走过的代码路径。docker pull的作用是下载一个镜像到本地。

前面的Docker源码学习的文章中我们知道docker的client通过HTTP请求的方式发送给了daemon。docker pull对应的client处理函数为CmdPull,我们来看代码:

// CmdPull pulls an image or a repository from the registry.
//
// Usage: docker pull [OPTIONS] IMAGENAME[:TAG|@DIGEST]
func (cli *DockerCli) CmdPull(args ...string) error {
    cmd := cli.Subcmd("pull", []string{"NAME[:TAG|@DIGEST]"}, "Pull an image or a repository from a registry", true)
    allTags := cmd.Bool([]string{"a", "-all-tags"}, false, "Download all tagged images in the repository")
    cmd.Require(flag.Exact, 1)

    cmd.ParseFlags(args, true)

首先是分析sub cmd,也就是pull的参数。继续看代码:

    var (
        v         = url.Values{}
        remote    = cmd.Arg(0)
        newRemote = remote
    )   
    taglessRemote, tag := parsers.ParseRepositoryTag(remote)
    if tag == "" && !*allTags {
        newRemote = utils.ImageReference(taglessRemote, tags.DEFAULTTAG)
    }   
    if tag != "" && *allTags {
        return fmt.Errorf("tag can't be used with --all-tags/-a")
    } 

    v.Set("fromImage", newRemote)

    // Resolve the Repository name from fqn to RepositoryInfo
    repoInfo, err := registry.ParseRepositoryInfo(taglessRemote)
    if err != nil {
        return err
    }

这里解析tag等信息,目的是获取我们的镜像的真正名字。继续看代码:

    _, _, err = cli.clientRequestAttemptLogin("POST", "/images/create?"+v.Encode(), nil, cli.out, repoInfo.Index, "pull")
    return err
}

可以看到上面在获取了真正的镜像名称后,发送了POST请求给我们的daemon。根据我们之前的文章可以看到/images/create对应的处理函数为:

"/images/create":                s.postImagesCreate,

我们来看下postImagesCreate的实现。代码为:

// Creates an image from Pull or from Import
func (s *Server) postImagesCreate(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
    if err := parseForm(r); err != nil {
        return err
    }

    var (
        image = r.Form.Get("fromImage")
        repo  = r.Form.Get("repo")
        tag   = r.Form.Get("tag")
    )
    authEncoded := r.Header.Get("X-Registry-Auth")
    authConfig := &cliconfig.AuthConfig{}
    if authEncoded != "" {
        authJson := base64.NewDecoder(base64.URLEncoding, strings.NewReader(authEncoded))
        if err := json.NewDecoder(authJson).Decode(authConfig); err != nil {
            // for a pull it is not an error if no auth was given
            // to increase compatibility with the existing api it is defaulting to be empty
            authConfig = &cliconfig.AuthConfig{}
        }
    }

首先是解析我们的http请求,然后从请求中获取到image、repo和tag。这几个参数也是我们docker pull的时候一般会提供的参数。接着处理了下auth的信息。继续看代码:

    w.Header().Set("Content-Type", "application/json")

w是我们的http response,这里设置了下返回报文的格式。继续看代码:

    if image != "" { //pull
        if tag == "" {
            image, tag = parsers.ParseRepositoryTag(image)
        }
        metaHeaders := map[string][]string{}
        for k, v := range r.Header {
            if strings.HasPrefix(k, "X-Meta-") {
                metaHeaders[k] = v
            }
        }

        imagePullConfig := &graph.ImagePullConfig{
            MetaHeaders: metaHeaders,
            AuthConfig:  authConfig,
            OutStream:   output,
        }

        err = s.daemon.Repositories().Pull(image, tag, imagePullConfig)
    } else { //import
    ......
    }
    if err != nil {
        if !output.Flushed() {
            return err
        }
        sf := streamformatter.NewJSONStreamFormatter()
        output.Write(sf.FormatError(err))
    }

    return nil

}

这里会区分是pull还是import,我们这里提供了image所以是pull。pull的代码大家可以看到,核心是:

        imagePullConfig := &graph.ImagePullConfig{
            MetaHeaders: metaHeaders,
            AuthConfig:  authConfig,
            OutStream:   output,
        }

        err = s.daemon.Repositories().Pull(image, tag, imagePullConfig)

imagePullConfig是一个配置文件,真正干活的是s.daemon.Repositories().Pull。我们看下其实现。首先看下Repositories():

func (daemon *Daemon) Repositories() *graph.TagStore {
    return daemon.repositories
}

根据之前的文章我们知道daemon.repositories是在daemon.NewDaemon中建立的,代码为:

    repositories, err := graph.NewTagStore(filepath.Join(config.Root, "repositories-"+d.driver.String()), tagCfg)
    if err != nil {
        return nil, fmt.Errorf("Couldn't create Tag store: %s", err)
    } 

这里的tagStore到底是个什么东西呢?来看下小秦这里的一个具体例子:

[root@dev docker]# cat /var/lib/docker/repositories-devicemapper
{"Repositories":{"docker.io/berngp/docker-zabbix":{"latest":"ad689c775bbf7fbdb03d886a3d066ffc64a179b3d5ff26ae103389af48c0508f"},"docker.io/centos":{"7":"7322fbe74aa5632b33a400959867c8ac4290e9c5112877a7754be70cfe5d66e9","latest":"7322fbe74aa5632b33a400959867c8ac4290e9c5112877a7754be70cfe5d66e9"},"docker.io/mysql":{"latest":"d63d4723d71553a06bd4f164f860d51760bfcf79de5189ec3a0b1c926d1a3fbd"}},"ConfirmDefPush":true}

可以看到其存放的就是我们docker images命令的输出,换句话说这里存放了我们的元信息。这里graph.NewTagStore其实就是返回了一个对应这个repositories-XXX文件内容的句柄。我们再来看Pull(),代码为:

func (s *TagStore) Pull(image string, tag string, imagePullConfig *ImagePullConfig) error {
    var (
        sf = streamformatter.NewJSONStreamFormatter()
    )

    // Resolve the Repository name from fqn to RepositoryInfo
    repoInfo, err := s.registryService.ResolveRepository(image)
    if err != nil {
        return err
    }

    if err := validateRepoName(repoInfo.LocalName); err != nil {
        return err
    }

一开始做的事情是解析我们的Repository地址,同时检查下是否是个合法的地址。继续看代码:

    c, err := s.poolAdd("pull", utils.ImageReference(repoInfo.LocalName, tag))
    if err != nil {
        if c != nil {
            // Another pull of the same repository is already taking place; just wait for it to finish
            imagePullConfig.OutStream.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", repoInfo.LocalName))
            <-c
            return nil
        }
        return err
    }
    defer s.poolRemove("pull", utils.ImageReference(repoInfo.LocalName, tag))

接着这里通过pollAdd标记了下信息,类似于:我要开始pull了,大家知晓下。如果发现已经有别的pull在运行那么就报错并结束。defer保证这次pull结束后会做清理,以免影响其他的pull。继续看代码:

    // Attempt pulling official content from a provided v2 mirror
    if repoInfo.Index.Official {
        v2mirrorEndpoint, v2mirrorRepoInfo, err := configureV2Mirror(repoInfo, s.registryService)
        if err != nil {
            logrus.Errorf("Error configuring mirrors: %s", err)
            return err
        }

        if v2mirrorEndpoint != nil {
            logrus.Debugf("Attempting to pull from v2 mirror: %s", v2mirrorEndpoint.URL)
            return s.pullFromV2Mirror(v2mirrorEndpoint, v2mirrorRepoInfo, imagePullConfig, tag, sf, logName)
        }
    }

这里会尝试从v2 mirror下载镜像,我们不走这个路线所以继续看代码:

    logrus.Debugf("pulling image from host %q with remote name %q", repoInfo.Index.Name, repoInfo.RemoteName)

    endpoint, err := repoInfo.GetEndpoint(imagePullConfig.MetaHeaders)
    if err != nil {
        return err
    }

这里获取endpoint。继续看代码:

    // TODO(tiborvass): reuse client from endpoint?
    // Adds Docker-specific headers as well as user-specified headers (metaHeaders)
    tr := transport.NewTransport(
        registry.NewTransport(registry.ReceiveTimeout, endpoint.IsSecure),
        registry.DockerHeaders(imagePullConfig.MetaHeaders)...,
    )
    client := registry.HTTPClient(tr)
    r, err := registry.NewSession(client, imagePullConfig.AuthConfig, endpoint)
    if err != nil {
        return err
    }

    if len(repoInfo.Index.Mirrors) == 0 && (repoInfo.Index.Official || endpoint.Version == registry.APIVersion2) {
        if repoInfo.Official {
            s.trustService.UpdateBase()
        }

        logrus.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName)
        if err := s.pullV2Repository(r, imagePullConfig.OutStream, repoInfo, tag, sf); err == nil {
            s.eventsService.Log("pull", logName, "")
            return nil
        } else if err != registry.ErrDoesNotExist && err != ErrV2RegistryUnavailable {
            logrus.Errorf("Error from V2 registry: %s", err)
        }

        logrus.Debug("image does not exist on v2 registry, falling back to v1")
    }

这里首先是获取了一个HTTPClient,这个用于从远端下载镜像。接着调用s.pullV2Repository进行下载。这里我们可以看到s.eventsService的作用,我们知道docker有个命令叫做docker events,应该就是通过s.eventsService实现的。不管它我们来看s.pullV2Repository的实现。我们看下其下载的代码,s.pullV2Repository会调用pullV2Tag进行真正的下载工作:

func (s *TagStore) pullV2Tag(r *registry.Session, out io.Writer, endpoint *registry.Endpoint, repoInfo *registry.RepositoryInfo, tag string, sf *streamformatter.StreamFormatter, auth *registry.RequestAuthorization) (bool, error) {
    logrus.Debugf("Pulling tag from V2 registry: %q", tag)

    remoteDigest, manifestBytes, err := r.GetV2ImageManifest(endpoint, repoInfo.RemoteName, tag, auth)
    if err != nil {
        return false, err
    }

    // loadManifest ensures that the manifest payload has the expected digest
    // if the tag is a digest reference.
    localDigest, manifest, verified, err := s.loadManifest(manifestBytes, tag, remoteDigest)
    if err != nil {
        return false, fmt.Errorf("error verifying manifest: %s", err)
    }

    if verified {
        logrus.Printf("Image manifest for %s has been verified", utils.ImageReference(repoInfo.CanonicalName, tag))
    }
    out.Write(sf.FormatStatus(tag, "Pulling from %s", repoInfo.CanonicalName))

这里是获取一些信息,注意manifest,其包括了我们镜像的相关元信息。我们看重要的代码:

                if _, err := io.Copy(tmpFile, progressreader.New(progressreader.Config{
                    In:        ioutil.NopCloser(io.TeeReader(r, verifier)),
                    Out:       out,
                    Formatter: sf,
                    Size:      int(l),
                    NewLines:  false,
                    ID:        stringid.TruncateID(img.ID),
                    Action:    "Downloading",
                })); err != nil {
                    return fmt.Errorf("unable to copy v2 image blob data: %s", err)
                }

这里可以看到从r直接拷贝数据到我们的tmpFile。现在我们的文件通过HTTP请求下载到了tmpFile了,接着这个tmpFile会放到哪里去呢?我们来看下:

    if d.tmpFile != nil {
        err = s.graph.Register(d.img,
            progressreader.New(progressreader.Config{
                In:        d.tmpFile,
                Out:       out,
                Formatter: sf,
                Size:      int(d.length),
                ID:        stringid.TruncateID(d.img.ID),
                Action:    "Extracting",
            }))
        if err != nil {
            return false, err
        }

        if err := s.graph.SetDigest(d.img.ID, d.digest); err != nil {
            return false, err
        }

        // FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
    }

这里可以看到我们的文件通过s.graph.Register存放到了graph中。这里的Register的实现为:

// Register imports a pre-existing image into the graph.
func (graph *Graph) Register(img *Image, layerData archive.ArchiveReader) (err error) {
    if err := image.ValidateID(img.ID); err != nil {
        return err
    }

    // We need this entire operation to be atomic within the engine. Note that
    // this doesn't mean Register is fully safe yet.
    graph.imageMutex.Lock(img.ID)
    defer graph.imageMutex.Unlock(img.ID)

    defer func() {
        // If any error occurs, remove the new dir from the driver.
        // Don't check for errors since the dir might not have been created.
        // FIXME: this leaves a possible race condition.
        if err != nil {
            graph.driver.Remove(img.ID)
        }
    }()

首先是做一些校验,以及注册一个错误处理函数。接着的核心代码为:

    // Create root filesystem in the driver
    if err := createRootFilesystemInDriver(graph, img, layerData); err != nil {
        return err
    }

    // Apply the diff/layer
    if err := graph.storeImage(img, layerData, tmp); err != nil {
        return err
    }
    // Commit
    if err := os.Rename(tmp, graph.imageRoot(img.ID)); err != nil {
        return err
    }

这里createRootFilesystemInDriver对应了driver的Create方法,storeImage对应了driver的ApplyDiff。我们看下aufs的实现。首先先看aufs的Create:

// Three folders are created for each id
// mnt, layers, and diff
func (a *Driver) Create(id, parent string) error {
    if err := a.createDirsFor(id); err != nil {
        return err 
    }   
    // Write the layers metadata
    f, err := os.Create(path.Join(a.rootPath(), "layers", id))
    if err != nil {
        return err 
    }   
    defer f.Close()

    if parent != "" {
        ids, err := getParentIds(a.rootPath(), parent)
        if err != nil {
            return err 
        }

        if _, err := fmt.Fprintln(f, parent); err != nil {
            return err 
        }
        for _, i := range ids {
            if _, err := fmt.Fprintln(f, i); err != nil {
                return err
            }
        }
    }
    return nil
}

可以看到这里主要是建立了三个目录。aufs文件的开头的注释可以让我们看的清楚些:

/*

aufs driver directory structure

  .
  ├── layers // Metadata of layers
  │   ├── 1
  │   ├── 2
  │   └── 3
  ├── diff  // Content of the layer
  │   ├── 1  // Contains layers that need to be mounted for the id
  │   ├── 2
  │   └── 3
  └── mnt    // Mount points for the rw layers to be mounted
      ├── 1
      ├── 2
      └── 3

*/

我们再来看下ApplyDiff:

// ApplyDiff extracts the changeset from the given diff into the
// layer with the specified id and parent, returning the size of the
// new layer in bytes.
func (a *Driver) ApplyDiff(id, parent string, diff archive.ArchiveReader) (size int64, err error) {
    // AUFS doesn't need the parent id to apply the diff.
    if err = a.applyDiff(id, diff); err != nil {
        return
    }

    return a.DiffSize(id, parent)
}
......
func (a *Driver) applyDiff(id string, diff archive.ArchiveReader) error {
    return chrootarchive.Untar(diff, path.Join(a.rootPath(), "diff", id), nil)
}

可以看到这里其实是执行了一个diff的untar到目的目录。所以我们可以看到,pull一个镜像主要做的事情就是下面这些:
1.从仓库处下载文件
2.根据graph driver的类型,将文件register到graph中

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*