摘要:用于获取元数据及根据的来匹配该会使用到的接口如下用于根据反推根据获取元数据提供了接口用于获取指定下管理的所有通过的数据变更,比如,来操作该。
k8s version: v1.11.0源码流程图 JobController 结构author: lbl167612@alibaba-inc.com
路径:pkg/controller/job/job_controller.go
type JobController struct { // 访问 kube-apiserver 的client // 需要查询 job、pod 等元数据信息 kubeClient clientset.Interface // pod 控制器,用于创建和删除pod使用 podControl controller.PodControlInterface // 用于更新 job status updateHandler func(job *batch.Job) error // job controller 核心接口,用于 sync job syncHandler func(jobKey string) (bool, error) // job controller 在启动时会对 job & pod 先进行同步 // 用于判断是否已经对 pod 同步过 podStoreSynced cache.InformerSynced // 用于判断是否已经对 job 同步过 jobStoreSynced cache.InformerSynced // expectations cache,记录该job下pods的adds & dels次数, // 并提供接口进行调整,已达到期望值。 expectations controller.ControllerExpectationsInterface // jobLister 用于获取job元数据及根据pod的labels来匹配jobs // 该controller 会使用到的接口如下: // 1. GetPodJobs(): 用于根据pod反推jobs // 2. Get(): 根据namespace & name 获取job 元数据 jobLister batchv1listers.JobLister // podStore 提供了接口用于获取指定job下管理的所有pods podStore corelisters.PodLister // Jobs queue // job controller通过kubeClient watch jobs & pods的数据变更, // 比如add、delete、update,来操作该queue。 // 并启动相应的worker,调用syncJob处理该queue中的jobs。 queue workqueue.RateLimitingInterface // jobs的相关events,通过该recorder进行广播 recorder record.EventRecorder }startJobController()
路径:cmd/kube-controller-manager/app/batch.go
startJobController() 是启动 job controller 的入口函数,该函数会注册到 kube-controller-manager 组件的 NewControllerInitializers() 接口中。
具体的 kube-controller-manager 组件的启动实现可以自己看下相关代码,这里先只关注 job controller 的实现。
func startJobController(ctx ControllerContext) (bool, error) { // 在启动job controller之前,判断下job 是否有配置生效 // 用户可以在创建k8s clusters时,通过修改kube-apiserver --runtime-config配置想要生效的 resource if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}] { return false, nil } // 初始化 JobController结构,并Run // Run的时候指定了gorutinue的数量,每个gorutinue 就是一个worker go job.NewJobController( ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Batch().V1().Jobs(), ctx.ClientBuilder.ClientOrDie("job-controller"), ).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop) return true, nil }NewJobController()
路径:pkg/controller/job/job_controller.go
func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController { // 初始化event broadcaster // 用于该controller 发送job 相关的events eventBroadcaster := record.NewBroadcaster() // 注册打印event信息的function // eventBroadcaster.StartEventWatcher()会创建gorutinue并开始watch event, // 根据注册的eventHandler轮询处理每个event,这里就是通过glog.Infof打印日志 eventBroadcaster.StartLogging(glog.Infof) // EventSinkImpl 包含了一个EventInterface, 实现了Create/Update/Delete/Get/Watch/Patch..等等操作 // 这一步跟上面一样,也是通过eventBroadcaster.StartEventWatcher() 注册了EventInterface实现, // 用来从指定的eventBroadcaster接收event,并发送给指定的接收器。 // k8s event实现可以多带带进行源码分析,值得学习下。 eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) // kubernetes 内部的限流策略 // 对apiserver来说,每个controller及scheduler都是client,所以内部的限流策略也至关重要。 if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) } // 初始化JobController jm := &JobController{ // 连接kube-apiserver的client kubeClient: kubeClient, // podControl,用于manageJob()中创建和删除pod podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), }, // 维护的期望状态下的Pod Cache,并且提供了修正该Cache的接口 // 比如会存jobs 下pods 的adds & dels 值,并提供了接口修改这两个值。 expectations: controller.NewControllerExpectations(), // jobs queue, 后面会创建对应数量的workers 从该queue 中处理各个jobs。 queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), // event recorder,用于发送job 相关的events recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), } // 注册jobInformer 的Add、Update、Delete 函数 // 该controller 获取到job 的Add、Update、Delete事件之后,会调用对应的function // 这些function 的核心还是去操作了上面的queue,让syncJob 处理queue 中的jobs jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { jm.enqueueController(obj, true) }, UpdateFunc: jm.updateJob, DeleteFunc: func(obj interface{}) { jm.enqueueController(obj, true) }, }) // 上面结构中已经有介绍 jm.jobLister = jobInformer.Lister() jm.jobStoreSynced = jobInformer.Informer().HasSynced // 注册 podInformer 的Add、Update、Delete 函数 // job 最终是依托了pod 去运行,所以相关的pods 事件也需要关心。 // 该podInformer 会监听所有的pods 变更事件,所以函数中都会去判断该pod 的containerRef是否是“job”, // 如果是的话再更新对应的expectations & queue, 触发syncJob进行处理。 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.addPod, UpdateFunc: jm.updatePod, DeleteFunc: jm.deletePod, }) // 上面结构中已经有介绍 jm.podStore = podInformer.Lister() jm.podStoreSynced = podInformer.Informer().HasSynced // 注册更新job status的函数 jm.updateHandler = jm.updateJobStatus // 注册sync job handler // 核心实现 jm.syncHandler = jm.syncJob return jm }Run()
路径:pkg/controller/job/job_controller.go
// Run the main goroutine responsible for watching and syncing jobs. func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer jm.queue.ShutDown() glog.Infof("Starting job controller") defer glog.Infof("Shutting down job controller") // 每次启动都会先等待Job & Pod cache 是否有同步过,即指queue是否已经同步过数据, // 因为每个worker干的活都是从queue中获取,所以只有queue有数据才应该继续往下创建worker。 if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) { return } // 创建指定数量的gorutinue // 每个gorutinue 执行worker,每个worker 执行完了之后sleep 1s,然后继续循环执行 for i := 0; i < workers; i++ { go wait.Until(jm.worker, time.Second, stopCh) } <-stopCh }
看下具体的worker 实现:
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (jm *JobController) worker() { for jm.processNextWorkItem() { } } func (jm *JobController) processNextWorkItem() bool { // 从queque 中获取job key // key 构成: namespace + "/" + name key, quit := jm.queue.Get() if quit { return false } defer jm.queue.Done(key) // 调用初始化时注册的 syncJob() // 如果执行成功,且forget = true, 则从queue 中删除该 key。 forget, err := jm.syncHandler(key.(string)) if err == nil { if forget { jm.queue.Forget(key) } return true } // 如果syncJob() 出错, 则打印出错信息 // 该utilruntime.HandleError() 会记录最近一次的错误时间点并进行限速,防止频繁打印错误信息。 utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err)) // 如果syncJob() 出错,则把该job key 继续丢回queue 中, 等待下次sync。 jm.queue.AddRateLimited(key) return true }syncJob()
worker的关键就是调用了syncJob,下面继续看下该函数具体做了什么:
func (jm *JobController) syncJob(key string) (bool, error) { // 惯用招数,看下每次sync 花了多久 startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) }() // 把key 拆分成job namespace & name ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return false, err } if len(ns) == 0 || len(name) == 0 { return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key) } // 获取job 信息 // 如果没有找到该job的话,表示已经被删除,并从ControllerExpectations中删除该key sharedJob, err := jm.jobLister.Jobs(ns).Get(name) if err != nil { if errors.IsNotFound(err) { glog.V(4).Infof("Job has been deleted: %v", key) jm.expectations.DeleteExpectations(key) return true, nil } return false, err } job := *sharedJob // 根据job.Status.Conditions是否处于“JobComplete” or "JobFailed", 来判断该job 是否已经完成。 // 如果已经完成的话,直接return if IsJobFinished(&job) { return true, nil } // 根据该 job key 失败的次数来计算该job 已经重试的次数。 // job 默认会有6次的重试机会 previousRetry := jm.queue.NumRequeues(key) // 判断该key 是否需要调用manageJob()进行sync,条件如下: // 1. 该key 在ControllerExpectations中的adds和dels 都 <= 0 // 2. 该key 在ControllerExpectations中已经超过5min没有更新了 // 3. 该key 在ControllerExpectations中没有查到 // 4. 调用GetExpectations()接口失败 jobNeedsSync := jm.expectations.SatisfiedExpectations(key) // 获取该job管理的所有pods pods, err := jm.getPodsForJob(&job) if err != nil { return false, err } // 获取处于active 的pods activePods := controller.FilterActivePods(pods) // 获取active & succeeded & failed pods数量 active := int32(len(activePods)) succeeded, failed := getStatus(pods) conditions := len(job.Status.Conditions) // 看下该job是否是第一次启动,是的话,设置StartTime; // 并判断是否设置了job.Spec.ActiveDeadlineSeconds, 如果设置了的话,在ActiveDeadlineSeconds秒后,在将该key 丢入queue if job.Status.StartTime == nil { now := metav1.Now() job.Status.StartTime = &now // enqueue a sync to check if job past ActiveDeadlineSeconds if job.Spec.ActiveDeadlineSeconds != nil { glog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds", key, *job.Spec.ActiveDeadlineSeconds) jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second) } } var manageJobErr error jobFailed := false var failureReason string var failureMessage string // 确认该job是否有新的pod failed jobHaveNewFailure := failed > job.Status.Failed // 确认重试次数是否有超出预期值 exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) // 如果job重试的次数超过了job.Spec.BackoffLimit(默认是6次),则标记该job为failed并指明原因; // 计算job重试的次数,还跟job中的pod template设置的重启策略有关,如果设置成“RestartPolicyOnFailure”, // job重试的次数 = 所有pods InitContainerStatuses 和 ContainerStatuses 的RestartCount 之和, // 也需要判断这个重试次数是否超过 BackoffLimit; if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { jobFailed = true failureReason = "BackoffLimitExceeded" failureMessage = "Job has reached the specified backoff limit" // 如果job 运行的时间超过了ActiveDeadlineSeconds,则标记该job为failed并指明原因 } else if pastActiveDeadline(&job) { jobFailed = true failureReason = "DeadlineExceeded" failureMessage = "Job was active longer than specified deadline" } // 如果job failed,则并发等待所有active pods删除结束; // 修改job.Status.Conditions, 并且根据之前记录的失败信息发送event if jobFailed { errCh := make(chan error, active) jm.deleteJobPods(&job, activePods, errCh) select { case manageJobErr = <-errCh: if manageJobErr != nil { break } default: } failed += active active = 0 job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage)) jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) // 如果job 没有标记为failed } else { // 根据之前判断的job是否需要sync,且该job 还未被删除,则调用mangeJob()。 // manageJob() 后面多带带解析 if jobNeedsSync && job.DeletionTimestamp == nil { active, manageJobErr = jm.manageJob(activePods, succeeded, &job) } completions := succeeded complete := false // job.Spec.Completions 表示该job只有成功创建这些数量的pods,才算完成。 // 如果该值没有设置,表示只要其中有一个pod 成功过,该job 就算完成了, // 但是需要注意,如果当前还有正在运行的pods,则需要等待这些pods都退出,才能标记该job完成任务了。 if job.Spec.Completions == nil { if succeeded > 0 && active == 0 { complete = true } // 如果设置了Completions值,只要该job下成功创建的pods数量 >= Completions,该job就成功结束了。 // 还需要发送一些异常events, 比如已经达到要求的成功创建的数量后,还有处于active的pods; // 或者成功的次数 > 指定的次数,这些应该都是预期之外的事件。 } else { if completions >= *job.Spec.Completions { complete = true if active > 0 { jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached") } if completions > *job.Spec.Completions { jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") } } } // 如果job成功结束,则更新job.Status.Conditions && job.Status.CompletionTime if complete { job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) now := metav1.Now() job.Status.CompletionTime = &now } } forget := false // 如果这次有成功的pod 产生,则forget 该次job key if job.Status.Succeeded < succeeded { forget = true } // 更新job.Status if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions { job.Status.Active = active job.Status.Succeeded = succeeded job.Status.Failed = failed // 更新job失败的话,将该job key继续丢入queue中。 if err := jm.updateHandler(&job); err != nil { return forget, err } // 如果这次job 有新的pod failed,且该job还未完成,则继续把该job key丢入queue中 if jobHaveNewFailure && !IsJobFinished(&job) { // returning an error will re-enqueue Job after the backoff period return forget, fmt.Errorf("failed pod(s) detected for job key %q", key) } // 否则forget job forget = true } return forget, manageJobErr }manageJob()
在syncJob()中有个关键函数 manageJob(),它主要做的事情就是根据 job 配置的并发数来确认当前处于 active 的 pods 数量是否合理,如果不合理的话则进行调整。
具体实现如下:
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) { var activeLock sync.Mutex active := int32(len(activePods)) parallelism := *job.Spec.Parallelism // 获取job key, 根据 namespace + "/" + name进行拼接。 jobKey, err := controller.KeyFunc(job) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn"t get key for job %#v: %v", job, err)) return 0, nil } var errCh chan error // 如果处于active pods 大于job设置的并发数,则并发删除超出部分的active pods。 // 需要注意的是,需要删除的active pods是有一定的优先级的: // not-ready < ready;unscheduled < scheduled;pending < running。 // 先基于上面的优先级对activePods 进行排序,然后再从头执行删除操作。 // 如果删除pods失败,则需要回滚之前设置的ControllerExpectations 和 active 值。 if active > parallelism { diff := active - parallelism errCh = make(chan error, diff) jm.expectations.ExpectDeletions(jobKey, int(diff)) glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) sort.Sort(controller.ActivePods(activePods)) active -= diff wait := sync.WaitGroup{} wait.Add(int(diff)) for i := int32(0); i < diff; i++ { go func(ix int32) { defer wait.Done() if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil { defer utilruntime.HandleError(err) glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name) jm.expectations.DeletionObserved(jobKey) activeLock.Lock() active++ activeLock.Unlock() errCh <- err } }(i) } wait.Wait() // 如果active pods少于设置的并发值,则先计算diff值,具体的计算跟Completions和Parallelism的配置有关。 // 1.job.Spec.Completions == nil && succeeded pods > 0, 则diff = 0; // 2.job.Spec.Completions == nil && succeeded pods = 0,则diff = Parallelism; // 3.job.Spec.Completions != nil 则diff等于(job.Spec.Completions - succeeded - active)和parallelism中的最小值(非负值); // 计算好diff值即知道了还需要创建多少pods,由于等待创建的pods数量可能会非常庞大,所以这里有个分批创建的逻辑: // 第一批创建1个,第二批创建2个,后续按2的倍数继续往下分批创建,但是每次创建的数量都不会大于diff值(diff值每次都会减掉对应的分批数量)。 // 如果创建pod超时,则直接return; // 如果创建pod失败,则回滚ControllerExpectations的adds 和 active 值,并不在执行后续未执行的 pods. } else if active < parallelism { wantActive := int32(0) if job.Spec.Completions == nil { if succeeded > 0 { wantActive = active } else { wantActive = parallelism } } else { wantActive = *job.Spec.Completions - succeeded if wantActive > parallelism { wantActive = parallelism } } diff := wantActive - active if diff < 0 { utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)) diff = 0 } jm.expectations.ExpectCreations(jobKey, int(diff)) errCh = make(chan error, diff) glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff) active += diff wait := sync.WaitGroup{} // 分批创建 diff 数量的 pods for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) { errorCount := len(errCh) wait.Add(int(batchSize)) for i := int32(0); i < batchSize; i++ { go func() { defer wait.Done() err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind)) if err != nil && errors.IsTimeout(err) { return } if err != nil { defer utilruntime.HandleError(err) glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) jm.expectations.CreationObserved(jobKey) activeLock.Lock() active-- activeLock.Unlock() errCh <- err } }() } wait.Wait() // 如果这次分批创建pods有失败的情况,则不在处理后续未执行的pods // 需要计算剩余未执行的pods数量,并更新 ControllerExpectations 的 adds 和 active 值 skippedPods := diff - batchSize if errorCount < len(errCh) && skippedPods > 0 { glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name) active -= skippedPods for i := int32(0); i < skippedPods; i++ { jm.expectations.CreationObserved(jobKey) } break } diff -= batchSize } } select { case err := <-errCh: // 只要前面有错误产生,则返回出错并会将该job 继续丢入queue,等待下次sync if err != nil { return active, err } default: } return active, nil }
整个job controller实现流程到这里就结束了,后面会继续分析cronJob controller的源码实现!
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/32722.html
摘要:如果没有指定,则没有期限。取消当前正在运行的,然后新建来替换。和这两个字段也是可选的。设置限制值为,相关类型的完成后将不会被保留。列出所有的列出所有的遍历所有的根据字段确定该是否由所创建。 k8s version: v1.11.0author: lbl167612@alibaba-inc.com 源码流程图 showImg(https://segmentfault.com/img/r...
摘要:用于批量处理短暂的一次性任务,并保证指定数量的成功结束。一旦有一个成功结束,其他都会准备退出。默认值指定可运行的时间期限,超过时间还未结束,系统将会尝试进行终止。已知问题设置为时,会与冲突,可以暂时将设置为进行规避。 介绍 Kubernetes有两个概念跟job有关: Job: 负责批量处理短暂的一次性任务,仅执行一次,并保证处理的一个或者多个Pod成功结束。 CronJob: ...
摘要:执行容器内部运行的执行工作作为容器的执行驱动,负责创建容器运行命名空间,负责容器资源使用的统计与限制,负责容器内部进程的真正运行等。典型的在启动后,首先将设置为进行一系列检查然后将其切换为供用户使用。 在https://segmentfault.com/a/11... 容器,隔离,云的概述。这篇对其中用途广泛的docker,k8s做详细介绍,并给出云搭建的生态环境体系。 docker ...
摘要:执行容器内部运行的执行工作作为容器的执行驱动,负责创建容器运行命名空间,负责容器资源使用的统计与限制,负责容器内部进程的真正运行等。典型的在启动后,首先将设置为进行一系列检查然后将其切换为供用户使用。 在https://segmentfault.com/a/11... 容器,隔离,云的概述。这篇对其中用途广泛的docker,k8s做详细介绍,并给出云搭建的生态环境体系。 docker ...
阅读 3399·2021-11-22 09:34
阅读 1882·2019-08-30 12:53
阅读 3474·2019-08-28 18:07
阅读 2956·2019-08-27 10:55
阅读 2940·2019-08-26 10:12
阅读 3557·2019-08-23 18:21
阅读 1319·2019-08-23 14:10
阅读 1436·2019-08-23 13:04