Docker源码学习 二

这篇文章来看下docker中的apiserver的实现。

我们从api := apiserver.New(serverConfig)这个代码开始看。来看下New的实现是什么,这里serverConfig结构体的内容为:

    serverConfig := &apiserver.ServerConfig{
        Logging:     true,
        EnableCors:  daemonCfg.EnableCors,
        CorsHeaders: daemonCfg.CorsHeaders,
        Version:     dockerversion.VERSION,
    }
    serverConfig = setPlatformServerConfig(serverConfig, daemonCfg)

New的代码为:

func New(cfg *ServerConfig) *Server {
    srv := &Server{
        cfg:   cfg, 
        start: make(chan struct{}),
    }    
    r := createRouter(srv)
    srv.router = r
    return srv
}

这里建立了一个server结构体:

type Server struct {
    daemon  *daemon.Daemon
    cfg     *ServerConfig
    router  *mux.Router
    start   chan struct{}
    servers []serverCloser
}

同时这个server结构体的router被赋值了一个router结构体,这里的router是什么呢?

熟悉OpenStack中使用频率特别高的routes组件的同学应该能猜到了,这里的router就是决定了一个URL最终映射到什么处理函数的一个路由。我们可以看到createRoute的代码:

// we keep enableCors just for legacy usage, need to be removed in the future
func createRouter(s *Server) *mux.Router {
    r := mux.NewRouter()
    if os.Getenv("DEBUG") != "" {
        ProfilerSetup(r, "/debug/")
    }
    m := map[string]map[string]HttpApiFunc{
        "GET": {
            "/_ping":                          s.ping,
            ......
        },
        "POST": {
            "/auth":                         s.postAuth,
            ......
        },
        "DELETE": {
            "/containers/{name:.*}": s.deleteContainers,
            ......
        },
        "OPTIONS": {
            "": s.optionsHandler,
        },
    }

    // If "api-cors-header" is not given, but "api-enable-cors" is true, we set cors to "*"
    // otherwise, all head values will be passed to HTTP handler
    corsHeaders := s.cfg.CorsHeaders
    if corsHeaders == "" && s.cfg.EnableCors {
        corsHeaders = "*"
    }

    for method, routes := range m {
        for route, fct := range routes {
            logrus.Debugf("Registering %s, %s", method, route)
            // NOTE: scope issue, make sure the variables are local and won't be changed
            localRoute := route
            localFct := fct
            localMethod := method

            // build the handler function
            f := makeHttpHandler(s.cfg.Logging, localMethod, localRoute, localFct, corsHeaders, version.Version(s.cfg.Version))

            // add the new route
            if localRoute == "" {
                r.Methods(localMethod).HandlerFunc(f)
            } else {
                r.Path("/v{version:[0-9.]+}" + localRoute).Methods(localMethod).HandlerFunc(f)
                r.Path(localRoute).Methods(localMethod).HandlerFunc(f)
            }
        }
    }

    return r
}

上面的代码首先建立了一个mux.NewRouter()结构体r,接着我们可以看到一个m的map,其中存放了HTTP请求类型、URL地址以及实际的处理函数的映射关系。然后会将这个map注入到r中,来看下面的这段代码:

            // build the handler function
            f := makeHttpHandler(s.cfg.Logging, localMethod, localRoute, localFct, corsHeaders, version.Version(s.cfg.Version))

            // add the new route
            if localRoute == "" {
                r.Methods(localMethod).HandlerFunc(f)
            } else {
                r.Path("/v{version:[0-9.]+}" + localRoute).Methods(localMethod).HandlerFunc(f)
                r.Path(localRoute).Methods(localMethod).HandlerFunc(f)
            }

makeHttpHandler实质上是对localFct做了一个wrap,也就是说当apiserver收到一个HTTP请求的时候,makeHttpHandler生成的func会先对这个HTTP请求做一些检查,比如client和server的版本号是否ok等等,然后才会调用localFct:

        if err := handlerFunc(version, w, r, mux.Vars(r)); err != nil {
            logrus.Errorf("Handler for %s %s returned error: %s", localMethod, localRoute, err)
            httpError(w, err)
        }

接着代码会判断localRoute是否为空字符串,如果是的话就不绑定带版本号的URL了。至于r.Path(localRoute).Methods(localMethod).HandlerFunc(f)做的就是在r中存放这个信息。熟悉Python中routes或者是熟悉tornado的同学应该很容易理解这个过程。之后在看某个方法的具体实现的时候可以来这里找到URL的mapping,然后查看对应函数的实现。

现在我们有了一个server,看下这个server启动时发生了什么。在代码中可以看到:

    go func() {
        if err := api.ServeApi(flHosts); err != nil {
            logrus.Errorf("ServeAPI error: %v", err)
            serveAPIWait <- err 
            return
        }
        serveAPIWait <- nil 
    }() 

这里应该是启动我们的server了,看下其实现:

// ServeApi loops through all of the protocols sent in to docker and spawns
// off a go routine to setup a serving http.Server for each.
func (s *Server) ServeApi(protoAddrs []string) error {
    var chErrors = make(chan error, len(protoAddrs))

    for _, protoAddr := range protoAddrs {
        protoAddrParts := strings.SplitN(protoAddr, "://", 2)
        if len(protoAddrParts) != 2 {
            return fmt.Errorf("bad format, expected PROTO://ADDR")
        }
        srv, err := s.newServer(protoAddrParts[0], protoAddrParts[1])
        if err != nil {
            return err
        }
        s.servers = append(s.servers, srv...)

        for _, s := range srv {
            logrus.Infof("Listening for HTTP on %s (%s)", protoAddrParts[0], protoAddrParts[1])
            go func(s serverCloser) {
                if err := s.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
                    err = nil
                }
                chErrors <- err
            }(s)
        }
    }

    for i := 0; i < len(protoAddrs); i++ {
        err := <-chErrors
        if err != nil {
            return err
        }
    }

    return nil
}

首先根据监听的地址分别调用s.newServer(protoAddrParts[0], protoAddrParts[1])创建一个干活的server,然后后者调用Serve开始服务。s.newServer会根据监听的协议类型建立套接字,然后启动一个HttpServer:

......
    switch proto {
......
    case "tcp":
        l, err := s.initTcpSocket(addr)
        if err != nil {
            return nil, err
        }
        ls = append(ls, l)
    default:
        return nil, fmt.Errorf("Invalid protocol format: %q", proto)
    }
    var res []serverCloser
    for _, l := range ls {
        res = append(res, &HttpServer{
            &http.Server{
                Addr:    addr,
                Handler: s.router,
            },
            l,
        })
    }
    return res, nil
}

这里HttpServer的结构体为:

type HttpServer struct {
    srv *http.Server
    l   net.Listener
} 

而http.Server则是go的原生server。

稍微来理一下,我们现在有下面的结构:
* 一个api,这个api是docker实现的server,代表了提供HTTP服务的一个逻辑上的server
* api下面有一个servers数组,每个数组都是实际提供服务的server,之所以有这么多server是因为可能存在多种监听方式或者是监听的地址
* api.servers中的server的最终实现是通过go的http.Server实现的

再来接着看主线代码:

    registryService := registry.NewService(registryCfg)
    d, err := daemon.NewDaemon(daemonCfg, registryService)
    if err != nil {
        if pfile != nil {
            if err := pfile.Remove(); err != nil {
                logrus.Error(err)
            }
        }
        logrus.Fatalf("Error starting daemon: %v", err)
    }

    logrus.Info("Daemon has completed initialization")

这里我们看到了一个registryService以及一个daemon.NewDaemon的函数,registryService目前只是一个保存了配置信息的结构体:

// NewService returns a new instance of Service ready to be
// installed no an engine.
func NewService(options *Options) *Service {
    return &Service{
        Config: NewServiceConfig(options),
    }
}
......
type Service struct {
    Config *ServiceConfig
}

daemon.NewDaemon比较复杂,我们最后再看。先来继续看api的剩余代码:

    // after the daemon is done setting up we can tell the api to start
    // accepting connections with specified daemon
    api.AcceptConnections(d)
func (s *Server) AcceptConnections(d *daemon.Daemon) {
    // Tell the init daemon we are accepting requests
    s.daemon = d
    s.registerSubRouter()
    go systemd.SdNotify("READY=1")
    // close the lock so the listeners start accepting connections
    select {
    case <-s.start:
    default:
        close(s.start)
    }
}

这里通过channel释放了start的锁。start的锁是在等待呢?上面提到的套接字建立的代码中可以看到类似如下代码:

    case "tcp":
        l, err := s.initTcpSocket(addr)
    ......
    case "unix":
        l, err := sockets.NewUnixSocket(addr, s.cfg.SocketGroup, s.start)

initTcpSocket中也会传递s.start。

apiserver基本上就是这么多东西,其实就是一个带了route的http server。接下来有两大块地方要分析,一个是我们的daemon,还有一个就是请求的真正实现。我们先通过一个简单的请求的实现代码来看下daemon的作用:

    "/containers/ps":                  s.getContainersJSON,

s.getContainersJSON的实现为:

func (s *Server) getContainersJSON(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
    if err := parseForm(r); err != nil {
        return err
    }   
        
    config := &daemon.ContainersConfig{
        All:     boolValue(r, "all"),
        Size:    boolValue(r, "size"),
        Since:   r.Form.Get("since"),
        Before:  r.Form.Get("before"),
        Filters: r.Form.Get("filters"),      
    }       
            
    if tmpLimit := r.Form.Get("limit"); tmpLimit != "" {
        limit, err := strconv.Atoi(tmpLimit)
        if err != nil {
            return err
        }
        config.Limit = limit
    }

    containers, err := s.daemon.Containers(config)
    if err != nil {
        return err
    }

    return writeJSON(w, http.StatusOK, containers)
}

可以看到实际上这个请求是由s.daemon完成的。我们上面看到了api其实就是一个路由请求的框架,会把URL路由的s的相关函数上。但是这些函数其实还是空架子,真正干活的是s.daemon。现在对daemon的地位及功能有了大概的认识后,我们接下来的文章会分析daemon的实现以及一些重要请求处理函数的实现。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*