资讯专栏INFORMATION COLUMN

Kubernetes1.5源码分析(一) apiServer启动分析

stormgens / 1747人阅读

摘要:源码版本简介是最重要的组成部分,不论是命令操作还是通过进行控制,实际都需要经过。仅用于长时间执行的请求最小请求处理超时时间,默认仅用于该文件内设置鉴权机构一组用于运行时的配置信息。在最后会启动服务。

源码版本

Kubernetes v1.5.0

简介

apiserver是K8S最重要的组成部分,不论是命令操作还是通过remote API进行控制,实际都需要经过apiserver。
apiserver是k8s系统中所有对象的增删改查盯的http/restful式服务端,其中盯是指watch操作。数据最终存储在分布式一致的etcd存储内,apiserver本身是无状态的,提供了这些数据访问的认证鉴权、缓存、api版本适配转换等一系列的功能。

关键结构:

ServerRunOptions结构:
路径: cmd/kube-apiserver/app/options/options.go

type ServerRunOptions struct {
    // 重名,下面称为GenericServerRunOptions
    GenericServerRunOptions     *genericoptions.ServerRunOptions    // 服务器通用的运行参数
    AllowPrivileged             bool  // 是否配置超级权限,即允许Pod中运行的容器拥有系统特权
    EventTTL                    time.Duration  // 事件留存事件, 默认1h
    KubeletConfig               kubeletclient.KubeletClientConfig  // K8S kubelet配置
    MaxConnectionBytesPerSec    int64    // 每秒的最大连接数
    // 指定的话,可以通过SSH指定的秘钥文件和用户名对Node进行访问
    SSHKeyfile                  string
    SSHUser                     string
    // 包含PEM-encoded x509 RSA公钥和私钥的文件路径,用于验证Service Account的token
    // 不指定的话,则使用--tls-private-key-file指定的文件
    ServiceAccountKeyFile       string
    // 设置为true时,系统会到etcd验证ServiceAccount token是否存在
    ServiceAccountLookup        bool
    WebhookTokenAuthnConfigFile string
    WebhookTokenAuthnCacheTTL   time.Duration
}

ServerRunOptions结构:
路径: pkg/genericapiserver/options/server_run_options.go

type ServerRunOptions struct {
    // 准入控制,如:"AlwaysAdmit","LimitRanger","ReousrceQuota"等
    AdmissionControl           string      
    // 准入控制的配置文件
    AdmissionControlConfigFile string      
    // 用于广播给集群的所有成员自己的IP地址,不指定的话就使用"--bind-address"的IP地址
    AdvertiseAddress           net.IP    
    // 安全访问的认证模式列表,以逗号分隔,包括:AlwaysAllow、AlwaysDeny、ABAC、Webhook、RBAC
    AuthorizationMode                        string
    // mode设置为ABAC时使用的csv格式的授权配置文件
    AuthorizationPolicyFile                  string
    // 下列跟mode配置成webhook有关
    AuthorizationWebhookConfigFile           string
    AuthorizationWebhookCacheAuthorizedTTL   time.Duration
    AuthorizationWebhookCacheUnauthorizedTTL time.Duration
    // mode设置为RBAC时使用的超级用户名,用该用户名进行RBAC认证
    AuthorizationRBACSuperUser               string

    AnonymousAuth                bool
    // 使用http基本认证的方式访问API Server的安全端口
    BasicAuthFile                string
    // 默认"0.0.0.0",apiServer在该地址的6443端口上开启https服务
    BindAddress                  net.IP
    // TLS证书所在目录,默认"/var/run/kubernetes"
    CertDirectory                string
    // 指定的话,该客户端证书将用于认证过程
    ClientCAFile                 string
    // 下列的云服务商有关
    CloudConfigFile              string
    CloudProvider                string
    // CORS 跨域资源共享
    CorsAllowedOriginList        []string
    // 默认的持久化存储格式,比如"application/json"
    DefaultStorageMediaType      string
    // 指定清理的工作线程数,可以提高清理namespace的效率,但是会增加系统资源的占用
    DeleteCollectionWorkers      int
    // 日志相关策略
    AuditLogPath                 string
    AuditLogMaxAge               int
    AuditLogMaxBackups           int
    AuditLogMaxSize              int
    // 使能GC
    EnableGarbageCollection      bool
    // 打开性能分析,可以通过:/debug/pprof/地址来查看程序栈,线程等信息
    EnableProfiling              bool
    EnableContentionProfiling    bool
    // 使能swaggerUI,访问地址:/swagger-ui
    EnableSwaggerUI              bool
    // 使能watch cache,对所有的watch操作进行缓存
    EnableWatchCache             bool
    // 按资源覆盖etcd服务的设置,以逗号分隔,比如group/resource#servers,其中servers为: http://ip:port
    EtcdServersOverrides         []string
    StorageConfig                storagebackend.Config
    // 用于生成该master对外的URL地址
    ExternalHost                 string
    // 绑定的不安全地址,即8080端口绑定的地址
    InsecureBindAddress          net.IP
    // 非安全端口,默认8080
    InsecurePort                 int
    // 设置keystone鉴权插件地址
    KeystoneURL                  string
    KeystoneCAFile               string
    
    KubernetesServiceNodePort    int
    LongRunningRequestRE         string
    // master数量
    MasterCount                  int
    // 设置master服务所在的namespace,默认为default
    MasterServiceNamespace       string
    // 同时处理的最大请求数,默认为400,超过该请求数将被拒绝。仅用于长时间执行的请求
    MaxRequestsInFlight          int
    // 最小请求处理超时时间,默认1800s,仅用于watch request
    MinRequestTimeout            int
    // 该文件内设置鉴权机构
    OIDCCAFile                   string
    OIDCClientID                 string
    OIDCIssuerURL                string
    OIDCUsernameClaim            string
    OIDCGroupsClaim              string
    RequestHeaderUsernameHeaders []string
    RequestHeaderClientCAFile    string
    RequestHeaderAllowedNames    []string
    // 一组key=value用于运行时的配置信息。api//,用于打开或者关闭对某个API版本的支持
    // api/all和api/legacy特别用于支持所有版本的API或支持旧版本的API
    RuntimeConfig                config.ConfigurationMap
    // https安全端口,默认6443;设置为0,表示不开启https
    SecurePort                   int
    // service的Cluster IP池
    ServiceClusterIPRange        net.IPNet // TODO: make this a list
    // service的NodePort模式下能使用的主机端口号范围,默认是30000--32767
    ServiceNodePortRange         utilnet.PortRange
    // 持久化存储的资源版本号,例如"group1/version1,group2/version2,..."
    StorageVersions              string
    // The default values for StorageVersions. StorageVersions overrides
    // these; you can change this if you want to change the defaults (e.g.,
    // for testing). This is not actually exposed as a flag.
    DefaultStorageVersions string
    TargetRAMMB            int
    // TLS CA文件
    TLSCAFile              string
    // 包含x509证书的文件路径,用于https认证
    TLSCertFile            string
    // 包含x509与tls-cert-file对应的私钥文件路径
    TLSPrivateKeyFile      string
    SNICertKeys            []config.NamedCertKey
    // 用于访问APIServer安全端口的token认证文件路径
    TokenAuthFile          string
    // 使能token
    EnableAnyToken         bool
    // 设置各资源对象watch缓存大小的列表,以逗号分隔,格式为resource#size
    // 前提是EnableWatchCache为true
    WatchCacheSizes        []string
}
ApiServer启动

路径:kubernetes/cmd/kube-apiserver/apiserver.go
入口main()函数:

func main() {
    rand.Seed(time.Now().UTC().UnixNano())

    // 新建一个apiserver对象
    s := options.NewServerRunOptions()
    // 接受用户命令行输入,其实就是自定义上述apiserver对象 
    s.AddFlags(pflag.CommandLine)
    // 解析并格式化用户传入的参数,最后填充APIServer结构体的各成员
    flag.InitFlags()
    // 初始化log配置,包括log输出位置、log等级等。
    logs.InitLogs()
    // 保证了即使apiserver异常崩溃了也能将内存中的log信息保存到磁盘文件中。 
    defer logs.FlushLogs()
    // 如果用户只是想看apiserver的版本号而不是启动apiserver,则打印apiserver的版本号并退出。
    verflag.PrintAndExitIfRequested()

    // 将创建的apiserver对象传入app.Run()中,最终绑定本地端口并绑定本地端口并创建一个HTTP Server与一个HTTPS Server。
    if err := app.Run(s); err != nil {
        fmt.Fprintf(os.Stderr, "%v
", err)
        os.Exit(1)
    }
}

新建一个apiserver对象---NewServerRunOptions():

func NewServerRunOptions() *ServerRunOptions {
    s := ServerRunOptions{
        // 初始化通用的apiserver运行参数,包括etcd后端存储参数
        GenericServerRunOptions: genericoptions.NewServerRunOptions().WithEtcdOptions(),
        // 事件的存储保留时间
        EventTTL:                1 * time.Hour,
        // Node上kubelet的客户端配置
        KubeletConfig: kubeletclient.KubeletClientConfig{
            // kubelet通信端口
            Port: ports.KubeletPort,
            PreferredAddressTypes: []string{
                string(api.NodeHostName),
                string(api.NodeInternalIP),
                string(api.NodeExternalIP),
                string(api.NodeLegacyHostIP),
            },
            // 是否开启https
            EnableHttps: true,
            // HTTP超时
            HTTPTimeout: time.Duration(5) * time.Second,
        },
        // 将webhook token authenticator返回的响应保存在缓存内的时间
        WebhookTokenAuthnCacheTTL: 2 * time.Minute,
    }
    return &s
}

上面的接口在初始化GenericServerRunOptions参数时又调用了genericoptions.NewServerRunOptions().WithEtcdOptions()接口,先来看下与上面接口名字一样的NewServerRunOptions():

func NewServerRunOptions() *ServerRunOptions {
    return &ServerRunOptions{
        // 以逗号作为分隔符的Admission Control插件的排序列表
        AdmissionControl:                         "AlwaysAdmit",
        AnonymousAuth:                            false,
        // 授权模式
        AuthorizationMode:                        "AlwaysAllow",
        AuthorizationWebhookCacheAuthorizedTTL:   5 * time.Minute,
        AuthorizationWebhookCacheUnauthorizedTTL: 30 * time.Second,
        // apiserver绑定的网卡地址
        BindAddress:                              net.ParseIP("0.0.0.0"),
        // 证书目录
        CertDirectory:                            "/var/run/kubernetes",
        // 默认的对象存储类型
        DefaultStorageMediaType:                  "application/json",
        DefaultStorageVersions:                   registered.AllPreferredGroupVersions(),
        DeleteCollectionWorkers:                  1,
        EnableGarbageCollection:                  true,
        EnableProfiling:                          true,
        EnableContentionProfiling:                false,
        EnableWatchCache:                         true,
        // HTTP绑定的IP地址
        InsecureBindAddress:                      net.ParseIP("127.0.0.1"),
        // 不安全端口(HTTP)
        InsecurePort:                             8080,
        LongRunningRequestRE:                     DefaultLongRunningRequestRE,
        // Kubernetes系统中Master的数量
        MasterCount:                              1,
        MasterServiceNamespace:                   api.NamespaceDefault,
        MaxRequestsInFlight:                      400,
        MinRequestTimeout:                        1800,
        // k8s运行时环境配置
        RuntimeConfig:                            make(config.ConfigurationMap),
        // 安全端口
        SecurePort:                               6443,
        ServiceNodePortRange:                     DefaultServiceNodePortRange,
        StorageVersions:                          registered.AllPreferredGroupVersions(),
    }
}

可以看到初始化的时候会有SecurePort、InsecurePort,实际就是对应HTTP、HTTPS的绑定端口。
我们可以看到这里的控制还是很全面的,包括安全控制(CertDirectory, HTTPS默认启动)、权限控制(AdmissionControl,AuthorizationMode)、服务限流控制(MaxRequestsInFlight)等。
具体的参数前面介绍结构体时基本都有提到。
继续后端存储etcd的配置初始化WithEtcdOptions():

func (o *ServerRunOptions) WithEtcdOptions() *ServerRunOptions {
    o.StorageConfig = storagebackend.Config{
        // etcd的默认路径前缀:/registry
        Prefix: DefaultEtcdPathPrefix,
        // 反序列化cache,未设置的话,会根据apiServer的内存限制进行配置
        DeserializationCacheSize: 0,
    }
    return o
}

到这里apiServer的运行参数初始化关键性步骤基本结束,至于后面的s.AddFlags(pflag.CommandLine)就是获取命令行的输入信息,然后进行重新覆盖,这里就不讲了。
可以根据kube-apiserver进程的命令行信息,把命令行传参和结构配置进行对应:

#/usr/bin/kube-apiserver --logtostderr=true --v=0 --etcd-servers=http://test-master:2379 --insecure-bind-address=0.0.0.0 --port=8080 --kubelet-port=10250 --allow-privileged=false --service-cluster-ip-range=10.254.0.0/16 --admission-control=NamespaceLifecycle,NamespaceExists,LimitRanger,SecurityContextDeny,ServiceAccount,ResourceQuota --service-account-key-file=/var/run/kubernetes/apiserver.key

初始化完成之后,最重要的任务就是启动实例了。
所有的操作都是在run函数中执行,app.run()接口实现在cmd/kube-apiserver/app/server.go。
RUN源码分析:

func Run(s *options.ServerRunOptions) error {
    // 检查etcd后端存储相关参数的有效性
    genericvalidation.VerifyEtcdServersList(s.GenericServerRunOptions)
    // 检查一些运行参数的有效性,并会设置一些默认值
    // 比如options.AdvertiseAddress参数没有设置,并且bind-address也没有设置,
    // k8s将会获取默认网卡的地址给该成员
    genericapiserver.DefaultAndValidateRunOptions(s.GenericServerRunOptions)
    // 根据之前初始化的GenericServerRunOptions对象来初始化创建genericapiserver.config
    // NewConfig()是初始化了一个默认的config,
    // ApplyOptions()根据GenericServerRunOptions进行再一遍的初始化
    // Complete()对一些没填充的字段,可以根据别的字段进行初始化
    // 实际NewConfig()中也调用了ApplyOptions()接口,只是参数是default值
    genericConfig := genericapiserver.NewConfig(). // create the new config
                            ApplyOptions(s.GenericServerRunOptions).
                            Complete()

    // 根据ServiceClusterIPRange输入参数,获取IPRange和ServiceIP
    serviceIPRange, apiServerServiceIP, err := genericapiserver.DefaultServiceIPRange(s.GenericServerRunOptions.ServiceClusterIPRange)
    if err != nil {TargetRAMMB
        glog.Fatalf("Error determining service IP ranges: %v", err)
    }
    // 有需要的话生成证书
    if err := genericConfig.MaybeGenerateServingCerts(apiServerServiceIP); err != nil {
        glog.Fatalf("Failed to generate service certificate: %v", err)
    }
    
    // 初始化能力集,多次运行apiserver也只会初始化一次
    capabilities.Initialize(capabilities.Capabilities{
        // 是否有超级权限
        AllowPrivileged: s.AllowPrivileged,
        // TODO(vmarmol): Implement support for HostNetworkSources.
        PrivilegedSources: capabilities.PrivilegedSources{
            HostNetworkSources: []string{},
            HostPIDSources:     []string{},
            HostIPCSources:     []string{},
        },
        // 每个用户连接的最大值,字节数/秒。当前只适用于长时间运行的请求
        PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
    })

    // 有需要的话设置网络隧道
    var tunneler genericapiserver.Tunneler
    var proxyDialerFn apiserver.ProxyDialerFunc
    // 如果运行在云平台中,则需要安装本机的SSH Key到Kubernetes集群中所有节点上
    // 可以用于通过该用户名和私钥,SSH到node上
    if len(s.SSHUser) > 0 {
        // Get ssh key distribution func, if supported
        var installSSH genericapiserver.InstallSSHKey
        cloud, err := cloudprovider.InitCloudProvider(s.GenericServerRunOptions.CloudProvider, s.GenericServerRunOptions.CloudConfigFile)
        if err != nil {
            glog.Fatalf("Cloud provider could not be initialized: %v", err)
        }
        if cloud != nil {
            if instances, supported := cloud.Instances(); supported {
                installSSH = instances.AddSSHKeyToAllInstances
            }
        }
        if s.KubeletConfig.Port == 0 {
            glog.Fatalf("Must enable kubelet port if proxy ssh-tunneling is specified.")
        }
        // Set up the tunneler
        // TODO(cjcullen): If we want this to handle per-kubelet ports or other
        // kubelet listen-addresses, we need to plumb through options.
        healthCheckPath := &url.URL{
            Scheme: "https",
            Host:   net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.Port), 10)),
            Path:   "healthz",
        }
        tunneler = genericapiserver.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSH)

        // Use the tunneler"s dialer to connect to the kubelet
        s.KubeletConfig.Dial = tunneler.Dial
        // Use the tunneler"s dialer when proxying to pods, services, and nodes
        proxyDialerFn = tunneler.Dial
    }

    // Proxying to pods and services is IP-based... don"t expect to be able to verify the hostname
    proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}

    // 后端存储etcd的反序列化缓存没有设置的话,根据TargetRAMMB值进行恰当的设置
    // TargetRAMMB:用户手动输入的apiServer的内存限制(单位:MB)
    // 小于1000MB的话按1000MB算
    if s.GenericServerRunOptions.StorageConfig.DeserializationCacheSize == 0 {
        glog.V(2).Infof("Initalizing deserialization cache size based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)

        clusterSize := s.GenericServerRunOptions.TargetRAMMB / 60
        s.GenericServerRunOptions.StorageConfig.DeserializationCacheSize = 25 * clusterSize
        if s.GenericServerRunOptions.StorageConfig.DeserializationCacheSize < 1000 {
            s.GenericServerRunOptions.StorageConfig.DeserializationCacheSize = 1000
        }
    }
    // 存储组版本
    storageGroupsToEncodingVersion, err := s.GenericServerRunOptions.StorageGroupsToEncodingVersion()
    if err != nil {
        glog.Fatalf("error generating storage version map: %s", err)
    }
    // 创建api工厂,包括请求头、解析工具、编码格式、API配置
    // 创建了一个DefaultStorageFactory对象
    storageFactory, err := genericapiserver.BuildDefaultStorageFactory(
        s.GenericServerRunOptions.StorageConfig, s.GenericServerRunOptions.DefaultStorageMediaType, api.Codecs,
        genericapiserver.NewDefaultResourceEncodingConfig(), storageGroupsToEncodingVersion,
        // FIXME: this GroupVersionResource override should be configurable
        []unversioned.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
        master.DefaultAPIResourceConfigSource(), s.GenericServerRunOptions.RuntimeConfig)
    if err != nil {
        glog.Fatalf("error in initializing storage factory: %s", err)
    }
    // 添加jobs和HPA(水平自动扩容)的接口
    storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs"))
    storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers"))
    // 根据用户输入的etcd-servers-overrides参数,设置对应groupResource对应的etcd地址
    for _, override := range s.GenericServerRunOptions.EtcdServersOverrides {
        tokens := strings.Split(override, "#")
        if len(tokens) != 2 {
            glog.Errorf("invalid value of etcd server overrides: %s", override)
            continue
        }

        apiresource := strings.Split(tokens[0], "/")
        if len(apiresource) != 2 {
            glog.Errorf("invalid resource definition: %s", tokens[0])
            continue
        }
        group := apiresource[0]
        resource := apiresource[1]
        groupResource := unversioned.GroupResource{Group: group, Resource: resource}

        servers := strings.Split(tokens[1], ";")
        // 上面都是解析用户输入的字符串,并生成对应的groupResource
        // 设置对应groupResource的etcdLocation
        storageFactory.SetEtcdLocation(groupResource, servers)
    }

    // 授权认证有关
    if len(s.ServiceAccountKeyFiles) == 0 && s.GenericServerRunOptions.TLSPrivateKeyFile != "" {
        if authenticator.IsValidServiceAccountKeyFile(s.GenericServerRunOptions.TLSPrivateKeyFile) {
            s.ServiceAccountKeyFiles = []string{s.GenericServerRunOptions.TLSPrivateKeyFile}
        } else {
            glog.Warning("No TLS key provided, service account token authentication disabled")
        }
    }

    var serviceAccountGetter serviceaccount.ServiceAccountTokenGetter
    // 判断是否设置为true,是的话则创建接口用于从etcd验证ServiceAccount token是否存在
    if s.ServiceAccountLookup {
        // If we need to look up service accounts and tokens,
        // go directly to etcd to avoid recursive auth insanity
        storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
        if err != nil {
            glog.Fatalf("Unable to get serviceaccounts storage: %v", err)
        }
        serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
    }

    // 安全认证相关
    apiAuthenticator, securityDefinitions, err := authenticator.New(authenticator.AuthenticatorConfig{
        Anonymous:                   s.GenericServerRunOptions.AnonymousAuth,
        AnyToken:                    s.GenericServerRunOptions.EnableAnyToken,
        // 指定basicauthfile文件所在的位置,当这个参数不为空的时候,
        // 会开启basicauth的认证方式,这是一个.csv文件,
        // 三列分别是password,username,useruid
        BasicAuthFile:               s.GenericServerRunOptions.BasicAuthFile,
        // 用于给客户端签名的根证书,当这个参数不为空的时候,
        // 会开启https的认证方式,会通过这个根证书对客户端的证书进行身份认证
        ClientCAFile:                s.GenericServerRunOptions.ClientCAFile,
        // 用于Token文件所在的位置,当这个参数不为空的时候,会采用token的认证方式,
        // token文件也是csv的格式,分别是“token,username,userid”
        TokenAuthFile:               s.GenericServerRunOptions.TokenAuthFile,
        OIDCIssuerURL:               s.GenericServerRunOptions.OIDCIssuerURL,
        OIDCClientID:                s.GenericServerRunOptions.OIDCClientID,
        OIDCCAFile:                  s.GenericServerRunOptions.OIDCCAFile,
        OIDCUsernameClaim:           s.GenericServerRunOptions.OIDCUsernameClaim,
        OIDCGroupsClaim:             s.GenericServerRunOptions.OIDCGroupsClaim,
        // 当不为空的时候,采用ServiceAccount的认证方式,这其实是一个公钥方式。
        // 发过来的信息是客户端使用对应的私钥加密,服务端使用指定的公钥来解密信息
        ServiceAccountKeyFiles:      s.ServiceAccountKeyFiles,
        // 默认为false。如果为true的话,就会从etcd中取出对应的ServiceAccount与
        // 传过来的信息进行对比验证,反之不会
        ServiceAccountLookup:        s.ServiceAccountLookup,
        ServiceAccountTokenGetter:   serviceAccountGetter,
        KeystoneURL:                 s.GenericServerRunOptions.KeystoneURL,
        KeystoneCAFile:              s.GenericServerRunOptions.KeystoneCAFile,
        WebhookTokenAuthnConfigFile: s.WebhookTokenAuthnConfigFile,
        WebhookTokenAuthnCacheTTL:   s.WebhookTokenAuthnCacheTTL,
        RequestHeaderConfig:         s.GenericServerRunOptions.AuthenticationRequestHeaderConfig(),
    })

    if err != nil {
        glog.Fatalf("Invalid Authentication Config: %v", err)
    }

    privilegedLoopbackToken := uuid.NewRandom().String()
    selfClientConfig, err := s.GenericServerRunOptions.NewSelfClientConfig(privilegedLoopbackToken)
    if err != nil {
        glog.Fatalf("Failed to create clientset: %v", err)
    }
    client, err := s.GenericServerRunOptions.NewSelfClient(privilegedLoopbackToken)
    if err != nil {
        glog.Errorf("Failed to create clientset: %v", err)
    }
    sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)

    authorizationConfig := authorizer.AuthorizationConfig{
        PolicyFile:                  s.GenericServerRunOptions.AuthorizationPolicyFile,
        WebhookConfigFile:           s.GenericServerRunOptions.AuthorizationWebhookConfigFile,
        WebhookCacheAuthorizedTTL:   s.GenericServerRunOptions.AuthorizationWebhookCacheAuthorizedTTL,
        WebhookCacheUnauthorizedTTL: s.GenericServerRunOptions.AuthorizationWebhookCacheUnauthorizedTTL,
        RBACSuperUser:               s.GenericServerRunOptions.AuthorizationRBACSuperUser,
        InformerFactory:             sharedInformers,
    }
    authorizationModeNames := strings.Split(s.GenericServerRunOptions.AuthorizationMode, ",")
    apiAuthorizer, err := authorizer.NewAuthorizerFromAuthorizationConfig(authorizationModeNames, authorizationConfig)
    if err != nil {
        glog.Fatalf("Invalid Authorization Config: %v", err)
    }

    admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")

    // TODO(dims): We probably need to add an option "EnableLoopbackToken"
    if apiAuthenticator != nil {
        var uid = uuid.NewRandom().String()
        tokens := make(map[string]*user.DefaultInfo)
        tokens[privilegedLoopbackToken] = &user.DefaultInfo{
            Name:   user.APIServerUser,
            UID:    uid,
            Groups: []string{user.SystemPrivilegedGroup},
        }

        tokenAuthenticator := authenticator.NewAuthenticatorFromTokens(tokens)
        apiAuthenticator = authenticatorunion.New(tokenAuthenticator, apiAuthenticator)

        tokenAuthorizer := authorizer.NewPrivilegedGroups(user.SystemPrivilegedGroup)
        apiAuthorizer = authorizerunion.New(tokenAuthorizer, apiAuthorizer)
    }

    pluginInitializer := admission.NewPluginInitializer(sharedInformers, apiAuthorizer)
    // 准入控制器
    admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile, pluginInitializer)
    if err != nil {
        glog.Fatalf("Failed to initialize plugins: %v", err)
    }

    proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
        Dial:            proxyDialerFn,
        TLSClientConfig: proxyTLSClientConfig,
    })
    kubeVersion := version.Get()
    
    // genericConfig在该接口最开始进行了创建并初始化
    genericConfig.Version = &kubeVersion
    genericConfig.LoopbackClientConfig = selfClientConfig
    genericConfig.Authenticator = apiAuthenticator
    genericConfig.Authorizer = apiAuthorizer
    genericConfig.AdmissionControl = admissionController
    genericConfig.APIResourceConfigSource = storageFactory.APIResourceConfigSource
    genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
    genericConfig.OpenAPIConfig.Definitions = generatedopenapi.OpenAPIDefinitions
    genericConfig.EnableOpenAPISupport = true
    genericConfig.EnableMetrics = true
    genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions

    // master.Config配置初始化
    config := &master.Config{
        GenericConfig: genericConfig.Config,

        StorageFactory:          storageFactory,
        EnableWatchCache:        s.GenericServerRunOptions.EnableWatchCache,
        EnableCoreControllers:   true,
        DeleteCollectionWorkers: s.GenericServerRunOptions.DeleteCollectionWorkers,
        EventTTL:                s.EventTTL,
        KubeletClientConfig:     s.KubeletConfig,
        EnableUISupport:         true,
        EnableLogsSupport:       true,
        ProxyTransport:          proxyTransport,

        Tunneler: tunneler,

        ServiceIPRange:       serviceIPRange,
        APIServerServiceIP:   apiServerServiceIP,
        APIServerServicePort: 443,

        ServiceNodePortRange:      s.GenericServerRunOptions.ServiceNodePortRange,
        KubernetesServiceNodePort: s.GenericServerRunOptions.KubernetesServiceNodePort,

        MasterCount: s.GenericServerRunOptions.MasterCount,
    }
    // 判断是否对watch cache进行了使能,默认是true
    // 是true的话,会初始化watchCacheSize,然后设置各个resource的CacheSize
    if s.GenericServerRunOptions.EnableWatchCache {
        glog.V(2).Infof("Initalizing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
        cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
        cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes)
    }
    // 创建master
    // Complete()完善了config的初始化
    // New()进行resources的初始化及RESTful-api注册
    m, err := config.Complete().New()
    if err != nil {
        return err
    }

    sharedInformers.Start(wait.NeverStop)
    // 运行HTTP/HTTPS服务
    m.GenericAPIServer.PrepareRun().Run(wait.NeverStop)
    return nil
}

该接口调用主要用于生成master实例对象,各种api的请求最后都是通过master对象来处理的。
在最后APIServer会启动HTTP/HTTPS服务。

基本的启动流程就介绍完了,这里不进入细讲,由于大致了解下启动流程。
后面会继续分章节介绍各个关键点。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/32558.html

相关文章

  • Kubernetes1.5源码分析(三) apiServer之go-restful的使用

    摘要:它包括一组和一个对象,使用进行请求派发。流程基本就是这样,接着我们直接进入接口看实现拼装然后填充并返回一个对象创建一个这个是关键,会对各种进行注册增加一个的将该加入到前两个调用函数比较简单,这里不进行介绍了。 源码版本 Kubernetes v1.5.0 go-restful 简介 go-restful是用于构建REST-style web服务的golang包。它是出现时因为一个jav...

    Doyle 评论0 收藏0
  • Kubernetes1.5源码分析(四) apiServer资源的etcd接口实现

    摘要:为所有对外提供服务的资源实现了一套通用的符合要求的操作接口,每个服务接口负责处理一类资源对象。该接口最终返回了的和清除操作资源的接口。 源码版本 Kubernetes v1.5.0 简介 k8s的各个组件与apiServer交互操作各种资源对象,最终都会落入到etcd中。k8s为所有对外提供服务的Restful资源实现了一套通用的符合Restful要求的etcd操作接口,每个服务接口负...

    K_B_Z 评论0 收藏0
  • Kubernetes1.5源码分析(二) apiServer之资源注册

    摘要:我们先将上面的接口解析放放,先看下是如何初始化的路径定义了,再看路径定义空的创建,用于不同版本对象转换增加一些转换函数上面就创建了一个空的。其实就是向添加了转换函数,比如将转换为,将转换为。 源码版本 Kubernetes v1.5.0 简介 k8s里面有各种资源,如Pod、Service、RC、namespaces等资源,用户操作的其实也就是这一大堆资源。但这些资源并不是杂乱无章的,...

    imccl 评论0 收藏0
  • kubeadm源码分析(kubernetes离线安装包,三步安装)

    摘要:离线安装包三步安装,简单到难以置信源码分析说句实在话,的代码写的真心一般,质量不是很高。然后给该租户绑定角色。 k8s离线安装包 三步安装,简单到难以置信 kubeadm源码分析 说句实在话,kubeadm的代码写的真心一般,质量不是很高。 几个关键点来先说一下kubeadm干的几个核心的事: kubeadm 生成证书在/etc/kubernetes/pki目录下 kubeadm 生...

    Eirunye 评论0 收藏0
  • kubeadm源码分析(kubernetes离线安装包,三步安装)

    摘要:离线安装包三步安装,简单到难以置信源码分析说句实在话,的代码写的真心一般,质量不是很高。然后给该租户绑定角色。 k8s离线安装包 三步安装,简单到难以置信 kubeadm源码分析 说句实在话,kubeadm的代码写的真心一般,质量不是很高。 几个关键点来先说一下kubeadm干的几个核心的事: kubeadm 生成证书在/etc/kubernetes/pki目录下 kubeadm 生...

    Heier 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<