/* Copyright 2017 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package job import ( "context" "fmt" "hash/fnv" "sync" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" kubeschedulinginformers "k8s.io/client-go/informers/scheduling/v1" "k8s.io/client-go/kubernetes" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" kubeschedulinglisters "k8s.io/client-go/listers/scheduling/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" batchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1" busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1" vcclientset "volcano.sh/apis/pkg/client/clientset/versioned" vcscheme "volcano.sh/apis/pkg/client/clientset/versioned/scheme" vcinformer "volcano.sh/apis/pkg/client/informers/externalversions" batchinformer "volcano.sh/apis/pkg/client/informers/externalversions/batch/v1alpha1" businformer "volcano.sh/apis/pkg/client/informers/externalversions/bus/v1alpha1" schedulinginformers "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" batchlister "volcano.sh/apis/pkg/client/listers/batch/v1alpha1" buslister "volcano.sh/apis/pkg/client/listers/bus/v1alpha1" schedulinglisters "volcano.sh/apis/pkg/client/listers/scheduling/v1beta1" "volcano.sh/volcano/pkg/controllers/apis" jobcache "volcano.sh/volcano/pkg/controllers/cache" "volcano.sh/volcano/pkg/controllers/framework" "volcano.sh/volcano/pkg/controllers/job/state" "volcano.sh/volcano/pkg/features" ) func init() { framework.RegisterController(&jobcontroller{}) } type delayAction struct { // The namespacing name of the job jobKey string // The name of the task taskName string // The name of the pod podName string // The UID of the pod podUID types.UID // The event caused the action event busv1alpha1.Event // The action to take. action busv1alpha1.Action // The delay before the action is executed delay time.Duration // The cancel function of the action cancel context.CancelFunc } // jobcontroller the Job jobcontroller type. type jobcontroller struct { kubeClient kubernetes.Interface vcClient vcclientset.Interface jobInformer batchinformer.JobInformer podInformer coreinformers.PodInformer pvcInformer coreinformers.PersistentVolumeClaimInformer pgInformer schedulinginformers.PodGroupInformer svcInformer coreinformers.ServiceInformer cmdInformer businformer.CommandInformer pcInformer kubeschedulinginformers.PriorityClassInformer queueInformer schedulinginformers.QueueInformer informerFactory informers.SharedInformerFactory vcInformerFactory vcinformer.SharedInformerFactory // A store of jobs jobLister batchlister.JobLister jobSynced func() bool // A store of pods podLister corelisters.PodLister podSynced func() bool pvcLister corelisters.PersistentVolumeClaimLister pvcSynced func() bool // A store of podgroups pgLister schedulinglisters.PodGroupLister pgSynced func() bool // A store of service svcLister corelisters.ServiceLister svcSynced func() bool cmdLister buslister.CommandLister cmdSynced func() bool pcLister kubeschedulinglisters.PriorityClassLister pcSynced func() bool queueLister schedulinglisters.QueueLister queueSynced func() bool // queue that need to sync up queueList []workqueue.TypedRateLimitingInterface[any] commandQueue workqueue.TypedRateLimitingInterface[any] cache jobcache.Cache // Job Event recorder recorder record.EventRecorder errTasks workqueue.TypedRateLimitingInterface[any] workers uint32 maxRequeueNum int delayActionMapLock sync.RWMutex // delayActionMap stores delayed actions for jobs, where outer map key is job key (namespace/name), // inner map key is pod name, and value is the delayed action to be performed delayActionMap map[string]map[string]*delayAction } func (cc *jobcontroller) Name() string { return "job-controller" } // Initialize creates the new Job controller. func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error { cc.kubeClient = opt.KubeClient cc.vcClient = opt.VolcanoClient sharedInformers := opt.SharedInformerFactory workers := opt.WorkerNum // Initialize event client eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"}) cc.informerFactory = sharedInformers cc.queueList = make([]workqueue.TypedRateLimitingInterface[any], workers) cc.commandQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) cc.cache = jobcache.New() cc.errTasks = newRateLimitingQueue() cc.recorder = recorder cc.workers = workers cc.maxRequeueNum = opt.MaxRequeueNum if cc.maxRequeueNum < 0 { cc.maxRequeueNum = -1 } var i uint32 for i = 0; i < workers; i++ { cc.queueList[i] = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) } factory := opt.VCSharedInformerFactory cc.vcInformerFactory = factory if utilfeature.DefaultFeatureGate.Enabled(features.VolcanoJobSupport) { cc.jobInformer = factory.Batch().V1alpha1().Jobs() cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: cc.addJob, UpdateFunc: cc.updateJob, DeleteFunc: cc.deleteJob, }) cc.jobLister = cc.jobInformer.Lister() cc.jobSynced = cc.jobInformer.Informer().HasSynced } if utilfeature.DefaultFeatureGate.Enabled(features.QueueCommandSync) { cc.cmdInformer = factory.Bus().V1alpha1().Commands() cc.cmdInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch v := obj.(type) { case *busv1alpha1.Command: if v.TargetObject != nil && v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() && v.TargetObject.Kind == "Job" { return true } return false default: return false } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: cc.addCommand, }, }, ) cc.cmdLister = cc.cmdInformer.Lister() cc.cmdSynced = cc.cmdInformer.Informer().HasSynced } cc.podInformer = sharedInformers.Core().V1().Pods() cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: cc.addPod, UpdateFunc: cc.updatePod, DeleteFunc: cc.deletePod, }) cc.podLister = cc.podInformer.Lister() cc.podSynced = cc.podInformer.Informer().HasSynced cc.pvcInformer = sharedInformers.Core().V1().PersistentVolumeClaims() cc.pvcLister = cc.pvcInformer.Lister() cc.pvcSynced = cc.pvcInformer.Informer().HasSynced cc.svcInformer = sharedInformers.Core().V1().Services() cc.svcLister = cc.svcInformer.Lister() cc.svcSynced = cc.svcInformer.Informer().HasSynced cc.pgInformer = factory.Scheduling().V1beta1().PodGroups() cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: cc.updatePodGroup, }) cc.pgLister = cc.pgInformer.Lister() cc.pgSynced = cc.pgInformer.Informer().HasSynced if utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) { cc.pcInformer = sharedInformers.Scheduling().V1().PriorityClasses() cc.pcLister = cc.pcInformer.Lister() cc.pcSynced = cc.pcInformer.Informer().HasSynced } cc.queueInformer = factory.Scheduling().V1beta1().Queues() cc.queueLister = cc.queueInformer.Lister() cc.queueSynced = cc.queueInformer.Informer().HasSynced cc.delayActionMap = make(map[string]map[string]*delayAction) // Register actions state.SyncJob = cc.syncJob state.KillJob = cc.killJob state.KillTarget = cc.killTarget return nil } // Run start JobController. func (cc *jobcontroller) Run(stopCh <-chan struct{}) { cc.informerFactory.Start(stopCh) cc.vcInformerFactory.Start(stopCh) for informerType, ok := range cc.informerFactory.WaitForCacheSync(stopCh) { if !ok { klog.Errorf("caches failed to sync: %v", informerType) return } } for informerType, ok := range cc.vcInformerFactory.WaitForCacheSync(stopCh) { if !ok { klog.Errorf("caches failed to sync: %v", informerType) return } } go wait.Until(cc.handleCommands, 0, stopCh) var i uint32 for i = 0; i < cc.workers; i++ { go func(num uint32) { wait.Until( func() { cc.worker(num) }, time.Second, stopCh) }(i) } go cc.cache.Run(stopCh) // Re-sync error tasks. go wait.Until(cc.processResyncTask, 0, stopCh) klog.Infof("JobController is running ...... ") } func (cc *jobcontroller) worker(i uint32) { klog.Infof("worker %d start ...... ", i) for cc.processNextReq(i) { } } func (cc *jobcontroller) belongsToThisRoutine(key string, count uint32) bool { val := cc.genHash(key) return val%cc.workers == count } func (cc *jobcontroller) getWorkerQueue(key string) workqueue.TypedRateLimitingInterface[any] { val := cc.genHash(key) queue := cc.queueList[val%cc.workers] return queue } func (cc *jobcontroller) genHash(key string) uint32 { hashVal := fnv.New32() hashVal.Write([]byte(key)) return hashVal.Sum32() } func (cc *jobcontroller) processNextReq(count uint32) bool { queue := cc.queueList[count] obj, shutdown := queue.Get() if shutdown { klog.Errorf("Fail to pop item from queue") return false } req := obj.(apis.Request) defer queue.Done(req) key := jobcache.JobKeyByReq(&req) if !cc.belongsToThisRoutine(key, count) { klog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count) queueLocal := cc.getWorkerQueue(key) queueLocal.Add(req) return true } klog.V(3).Infof("Try to handle request <%v>", req) cc.CleanPodDelayActionsIfNeed(req) jobInfo, err := cc.cache.Get(key) if err != nil { // TODO(k82cn): ignore not-ready error. klog.Errorf("Failed to get job by <%v> from cache: %v", req, err) return true } st := state.NewState(jobInfo) if st == nil { klog.Errorf("Invalid state <%s> of Job <%v/%v>", jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name) return true } delayAct := applyPolicies(jobInfo.Job, &req) if delayAct.delay != 0 { klog.V(3).Infof("Execute <%v> on Job <%s/%s> after %s", delayAct.action, req.Namespace, req.JobName, delayAct.delay.String()) cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf( "Execute action %s after %s", delayAct.action, delayAct.delay.String())) cc.AddDelayActionForJob(req, delayAct) return true } klog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.", delayAct.action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st) if delayAct.action != busv1alpha1.SyncJobAction { cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf( "Start to execute action %s ", delayAct.action)) } action := GetStateAction(delayAct) if err := st.Execute(action); err != nil { cc.handleJobError(queue, req, st, err, delayAct.action) return true } // If no error, forget it. queue.Forget(req) // If the action is not an internal action, cancel all delayed actions if !isInternalAction(delayAct.action) { cc.cleanupDelayActions(delayAct) } return true } // CleanPodDelayActionsIfNeed is used to clean delayed actions for Pod events when the pod phase changed: // if the event is not PodPending event: // - cancel corresponding Pod Pending delayed action // - if the event is PodRunning state, cancel corresponding Pod Failed and Pod Evicted delayed actions func (cc *jobcontroller) CleanPodDelayActionsIfNeed(req apis.Request) { // Skip cleaning delayed actions for non-pod events if !cc.isPodEvent(req) { return } if req.Event != busv1alpha1.PodPendingEvent { key := jobcache.JobKeyByReq(&req) cc.delayActionMapLock.Lock() defer cc.delayActionMapLock.Unlock() if taskMap, exists := cc.delayActionMap[key]; exists { if delayAct, exists := taskMap[req.PodName]; exists { shouldCancel := false if delayAct.event == busv1alpha1.PodPendingEvent { // For PodPending delayed action, we need to check if the Pod UID matches // Because if a Pod is deleted and immediately recreated, // the new Pod's pending event may be queued before the old Pod's delete event if req.PodUID == delayAct.podUID { shouldCancel = true } } if (delayAct.event == busv1alpha1.PodFailedEvent || delayAct.event == busv1alpha1.PodEvictedEvent) && req.Event == busv1alpha1.PodRunningEvent { shouldCancel = true } if shouldCancel { klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> because of event <%s> of Job <%s>", delayAct.action, req.PodName, req.Event, delayAct.jobKey) delayAct.cancel() delete(taskMap, req.PodName) } } } } } func (cc *jobcontroller) isPodEvent(req apis.Request) bool { return req.Event == busv1alpha1.PodPendingEvent || req.Event == busv1alpha1.PodRunningEvent || req.Event == busv1alpha1.PodFailedEvent || req.Event == busv1alpha1.PodEvictedEvent } func (cc *jobcontroller) AddDelayActionForJob(req apis.Request, delayAct *delayAction) { cc.delayActionMapLock.Lock() defer cc.delayActionMapLock.Unlock() m, ok := cc.delayActionMap[delayAct.jobKey] if !ok { m = make(map[string]*delayAction) cc.delayActionMap[delayAct.jobKey] = m } if oldDelayAct, exists := m[req.PodName]; exists && oldDelayAct.action == delayAct.action { return } m[req.PodName] = delayAct ctx, cancel := context.WithTimeout(context.Background(), delayAct.delay) delayAct.cancel = cancel go func() { <-ctx.Done() if ctx.Err() == context.Canceled { klog.V(4).Infof("Job<%s/%s>'s delayed action %s is canceled", req.Namespace, req.JobName, delayAct.action) return } klog.V(4).Infof("Job<%s/%s>'s delayed action %s is expired, execute it", req.Namespace, req.JobName, delayAct.action) jobInfo, err := cc.cache.Get(delayAct.jobKey) if err != nil { klog.Errorf("Failed to get job by <%v> from cache: %v", req, err) return } st := state.NewState(jobInfo) if st == nil { klog.Errorf("Invalid state <%s> of Job <%v/%v>", jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name) return } queue := cc.getWorkerQueue(delayAct.jobKey) if err := st.Execute(GetStateAction(delayAct)); err != nil { cc.handleJobError(queue, req, st, err, delayAct.action) } queue.Forget(req) cc.cleanupDelayActions(delayAct) }() } func (cc *jobcontroller) handleJobError(queue workqueue.TypedRateLimitingInterface[any], req apis.Request, st state.State, err error, action busv1alpha1.Action) { if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum { klog.V(2).Infof("Failed to handle Job <%s/%s>: %v", req.Namespace, req.JobName, err) queue.AddRateLimited(req) return } cc.recordJobEvent(req.Namespace, req.JobName, batchv1alpha1.ExecuteAction, fmt.Sprintf("Job failed on action %s for retry limit reached", action)) klog.Warningf("Terminating Job <%s/%s> and releasing resources", req.Namespace, req.JobName) if err = st.Execute(state.Action{Action: busv1alpha1.TerminateJobAction}); err != nil { klog.Errorf("Failed to terminate Job<%s/%s>: %v", req.Namespace, req.JobName, err) } klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached", req.Namespace, req.JobName, err) } // cleanupDelayActions cleans up delayed actions // After a delayed action is executed, other delayed actions of the same type need to be cleaned up to avoid duplicate execution // Parameters: // - currentDelayAction: the delayed action that has just been executed // // Implementation logic: // 1. Get the type of current delayed action (Job level, Task level or Pod level) // 2. Iterate through all delayed actions under this Job // 3. If the delayed action type matches, cancel it and remove from the map // // Usage scenarios: // - When a Pod failure triggers Job termination, need to cancel other Job delayed actions under this Job func (cc *jobcontroller) cleanupDelayActions(currentDelayAction *delayAction) { cc.delayActionMapLock.Lock() defer cc.delayActionMapLock.Unlock() actionType := GetActionType(currentDelayAction.action) if m, exists := cc.delayActionMap[currentDelayAction.jobKey]; exists { for _, delayAct := range m { if GetActionType(delayAct.action) == actionType { // For Task level actions, only cancel delayed actions for the same task if actionType == TaskAction && delayAct.taskName != currentDelayAction.taskName { continue } // For Pod level actions, only cancel delayed actions for the same pod if actionType == PodAction && delayAct.podName != currentDelayAction.podName { continue } if delayAct.cancel != nil { klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> because of event <%s> and action <%s> of Job <%s>", delayAct.action, delayAct.podName, currentDelayAction.event, currentDelayAction.action, delayAct.jobKey) delayAct.cancel() } delete(m, delayAct.podName) } } } }
/* Copyright 2019 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package job import ( "context" "fmt" "reflect" "strings" "sync" "sync/atomic" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" batch "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/apis/helpers" scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/controllers/apis" jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers" "volcano.sh/volcano/pkg/controllers/job/state" ) var calMutex sync.Mutex // getPodGroupByJob returns the podgroup related to the vcjob. // it will return normal pg if it exist in cluster, // else it return legacy pg before version 1.5. func (cc *jobcontroller) getPodGroupByJob(job *batch.Job) (*scheduling.PodGroup, error) { pgName := cc.generateRelatedPodGroupName(job) pg, err := cc.pgLister.PodGroups(job.Namespace).Get(pgName) if err == nil { return pg, nil } if apierrors.IsNotFound(err) { pg, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name) if err != nil { return nil, err } return pg, nil } return nil, err } func (cc *jobcontroller) generateRelatedPodGroupName(job *batch.Job) string { return fmt.Sprintf("%s-%s", job.Name, string(job.UID)) } func (cc *jobcontroller) killTarget(jobInfo *apis.JobInfo, target state.Target, updateStatus state.UpdateStatusFn) error { if target.Type == state.TargetTypeTask { klog.V(3).Infof("Killing task <%s> of Job <%s/%s>, current version %d", target.TaskName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) defer klog.V(3).Infof("Finished task <%s> of Job <%s/%s> killing, current version %d", target.TaskName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) } else if target.Type == state.TargetTypePod { klog.V(3).Infof("Killing pod <%s> of Job <%s/%s>, current version %d", target.PodName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) defer klog.V(3).Infof("Finished pod <%s> of Job <%s/%s> killing, current version %d", target.PodName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) } return cc.killPods(jobInfo, nil, &target, updateStatus) } func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error { klog.V(3).Infof("Killing Job <%s/%s>, current version %d", jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) defer klog.V(3).Infof("Finished Job <%s/%s> killing, current version %d", jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) return cc.killPods(jobInfo, podRetainPhase, nil, updateStatus) } func (cc *jobcontroller) killPods(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, target *state.Target, updateStatus state.UpdateStatusFn) error { job := jobInfo.Job if job.DeletionTimestamp != nil { klog.Infof("Job <%s/%s> is terminating, skip management process.", job.Namespace, job.Name) return nil } var pending, running, terminating, succeeded, failed, unknown int32 taskStatusCount := make(map[string]batch.TaskState) var errs []error var total int podsToKill := make(map[string]*v1.Pod) if target != nil { if target.Type == state.TargetTypeTask { podsToKill = jobInfo.Pods[target.TaskName] } else if target.Type == state.TargetTypePod { podsToKill[target.PodName] = jobInfo.Pods[target.TaskName][target.PodName] } total += len(podsToKill) } else { // Job version is bumped only when job is killed job.Status.Version++ for _, pods := range jobInfo.Pods { for _, pod := range pods { total++ if pod.DeletionTimestamp != nil { klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name) terminating++ continue } maxRetry := job.Spec.MaxRetry lastRetry := false if job.Status.RetryCount >= maxRetry-1 { lastRetry = true } // Only retain the Failed and Succeeded pods at the last retry. // If it is not the last retry, kill pod as defined in `podRetainPhase`. retainPhase := podRetainPhase if lastRetry { retainPhase = state.PodRetainPhaseSoft } _, retain := retainPhase[pod.Status.Phase] if !retain { podsToKill[pod.Name] = pod } } } } for _, pod := range podsToKill { if pod.DeletionTimestamp != nil { klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name) terminating++ continue } err := cc.deleteJobPod(job.Name, pod) if err == nil { terminating++ continue } // record the err, and then collect the pod info like retained pod errs = append(errs, err) cc.resyncTask(pod) classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown) calcPodStatus(pod, taskStatusCount) } if len(errs) != 0 { klog.Errorf("failed to kill pods for job %s/%s, with err %+v", job.Namespace, job.Name, errs) cc.recorder.Event(job, v1.EventTypeWarning, FailedDeletePodReason, fmt.Sprintf("Error deleting pods: %+v", errs)) return fmt.Errorf("failed to kill %d pods of %d", len(errs), total) } job = job.DeepCopy() job.Status.Pending = pending job.Status.Running = running job.Status.Succeeded = succeeded job.Status.Failed = failed job.Status.Terminating = terminating job.Status.Unknown = unknown job.Status.TaskStatusCount = taskStatusCount if updateStatus != nil { if updateStatus(&job.Status) { job.Status.State.LastTransitionTime = metav1.Now() jobCondition := newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime) job.Status.Conditions = append(job.Status.Conditions, jobCondition) } } // Update running duration runningDuration := metav1.Duration{Duration: job.Status.State.LastTransitionTime.Sub(jobInfo.Job.CreationTimestamp.Time)} klog.V(3).Infof("Running duration is %s", runningDuration.ToUnstructured()) job.Status.RunningDuration = &runningDuration // must be called before update job status if err := cc.pluginOnJobDelete(job); err != nil { return err } // Update Job status newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{}) if apierrors.IsNotFound(err) { klog.Errorf("Job %v/%v was not found", job.Namespace, job.Name) return nil } if err != nil { klog.Errorf("Failed to update status of Job %v/%v: %v", job.Namespace, job.Name, err) return err } if e := cc.cache.Update(newJob); e != nil { klog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v", newJob.Namespace, newJob.Name, e) return e } // Delete PodGroup pg, err := cc.getPodGroupByJob(job) if err != nil && !apierrors.IsNotFound(err) { klog.Errorf("Failed to find PodGroup of Job: %s/%s, error: %s", job.Namespace, job.Name, err.Error()) return err } if pg != nil { if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), pg.Name, metav1.DeleteOptions{}); err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Failed to delete PodGroup of Job %s/%s: %v", job.Namespace, job.Name, err) return err } } } // NOTE(k82cn): DO NOT delete input/output until job is deleted. return nil } func (cc *jobcontroller) initiateJob(job *batch.Job) (*batch.Job, error) { klog.V(3).Infof("Starting to initiate Job <%s/%s>", job.Namespace, job.Name) jobInstance, err := cc.initJobStatus(job) if err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(batch.JobStatusError), fmt.Sprintf("Failed to initialize job status, err: %v", err)) return nil, err } if err := cc.pluginOnJobAdd(jobInstance); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError), fmt.Sprintf("Execute plugin when job add failed, err: %v", err)) return nil, err } newJob, err := cc.createJobIOIfNotExist(jobInstance) if err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PVCError), fmt.Sprintf("Failed to create PVC, err: %v", err)) return nil, err } if err := cc.createOrUpdatePodGroup(newJob); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError), fmt.Sprintf("Failed to create PodGroup, err: %v", err)) return nil, err } return newJob, nil } func (cc *jobcontroller) initOnJobUpdate(job *batch.Job) error { klog.V(3).Infof("Starting to initiate Job <%s/%s> on update", job.Namespace, job.Name) if err := cc.pluginOnJobUpdate(job); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError), fmt.Sprintf("Execute plugin when job add failed, err: %v", err)) return err } if err := cc.createOrUpdatePodGroup(job); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError), fmt.Sprintf("Failed to create PodGroup, err: %v", err)) return err } return nil } func (cc *jobcontroller) GetQueueInfo(queue string) (*scheduling.Queue, error) { queueInfo, err := cc.queueLister.Get(queue) if err != nil { klog.Errorf("Failed to get queue from queueLister, error: %s", err.Error()) } return queueInfo, err } func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error { job := jobInfo.Job klog.V(3).Infof("Starting to sync up Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version) defer klog.V(3).Infof("Finished Job <%s/%s> sync up, current version %d", job.Namespace, job.Name, job.Status.Version) if jobInfo.Job.DeletionTimestamp != nil { klog.Infof("Job <%s/%s> is terminating, skip management process.", jobInfo.Job.Namespace, jobInfo.Job.Name) return nil } // deep copy job to prevent mutate it job = job.DeepCopy() // Find queue that job belongs to, and check if the queue has forwarding metadata queueInfo, err := cc.GetQueueInfo(job.Spec.Queue) if err != nil { return err } var jobForwarding bool if len(queueInfo.Spec.ExtendClusters) != 0 { jobForwarding = true if len(job.Annotations) == 0 { job.Annotations = make(map[string]string) } job.Annotations[batch.JobForwardingKey] = "true" job, err = cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{}) if err != nil { klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error()) return err } } // Skip job initiation if job is already initiated if !isInitiated(job) { if job, err = cc.initiateJob(job); err != nil { return err } } else { // TODO: optimize this call it only when scale up/down if err = cc.initOnJobUpdate(job); err != nil { return err } } if len(queueInfo.Spec.ExtendClusters) != 0 { jobForwarding = true job.Annotations[batch.JobForwardingKey] = "true" _, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{}) if err != nil { klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error()) return err } } var syncTask bool pg, err := cc.getPodGroupByJob(job) if err != nil && !apierrors.IsNotFound(err) { klog.Errorf("Failed to find PodGroup of Job: %s/%s, error: %s", job.Namespace, job.Name, err.Error()) return err } if pg != nil { if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending { syncTask = true } cc.recordPodGroupEvent(job, pg) } var jobCondition batch.JobCondition oldStatus := job.Status if !syncTask { if updateStatus != nil { updateStatus(&job.Status) } if equality.Semantic.DeepEqual(job.Status, oldStatus) { klog.V(4).Infof("Job <%s/%s> has not updated for no changing", job.Namespace, job.Name) return nil } job.Status.State.LastTransitionTime = metav1.Now() jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime) job.Status.Conditions = append(job.Status.Conditions, jobCondition) newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{}) if err != nil { klog.Errorf("Failed to update status of Job %v/%v: %v", job.Namespace, job.Name, err) return err } if e := cc.cache.Update(newJob); e != nil { klog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v", newJob.Namespace, newJob.Name, e) return e } return nil } var running, pending, terminating, succeeded, failed, unknown int32 taskStatusCount := make(map[string]batch.TaskState) podToCreate := make(map[string][]*v1.Pod) var podToDelete []*v1.Pod var creationErrs []error var deletionErrs []error appendMutex := sync.Mutex{} appendError := func(container *[]error, err error) { appendMutex.Lock() defer appendMutex.Unlock() *container = append(*container, err) } waitCreationGroup := sync.WaitGroup{} for _, ts := range job.Spec.Tasks { ts.Template.Name = ts.Name tc := ts.Template.DeepCopy() name := ts.Template.Name pods, found := jobInfo.Pods[name] if !found { pods = map[string]*v1.Pod{} } var podToCreateEachTask []*v1.Pod for i := 0; i < int(ts.Replicas); i++ { podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i) if pod, found := pods[podName]; !found { newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding) if err := cc.pluginOnPodCreate(job, newPod); err != nil { return err } podToCreateEachTask = append(podToCreateEachTask, newPod) waitCreationGroup.Add(1) } else { delete(pods, podName) if pod.DeletionTimestamp != nil { klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name) atomic.AddInt32(&terminating, 1) continue } classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown) calcPodStatus(pod, taskStatusCount) } } podToCreate[ts.Name] = podToCreateEachTask for _, pod := range pods { podToDelete = append(podToDelete, pod) } } for taskName, podToCreateEachTask := range podToCreate { if len(podToCreateEachTask) == 0 { continue } go func(taskName string, podToCreateEachTask []*v1.Pod) { taskIndex := jobhelpers.GetTaskIndexUnderJob(taskName, job) if job.Spec.Tasks[taskIndex].DependsOn != nil { if !cc.waitDependsOnTaskMeetCondition(taskIndex, job) { klog.V(3).Infof("Job %s/%s depends on task not ready", job.Name, job.Namespace) // release wait group for _, pod := range podToCreateEachTask { go func(pod *v1.Pod) { defer waitCreationGroup.Done() }(pod) } return } } for _, pod := range podToCreateEachTask { go func(pod *v1.Pod) { defer waitCreationGroup.Done() newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) if err != nil && !apierrors.IsAlreadyExists(err) { // Failed to create Pod, waitCreationGroup a moment and then create it again // This is to ensure all podsMap under the same Job created // So gang-scheduling could schedule the Job successfully klog.Errorf("Failed to create pod %s for Job %s, err %#v", pod.Name, job.Name, err) appendError(&creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err)) } else { classifyAndAddUpPodBaseOnPhase(newPod, &pending, &running, &succeeded, &failed, &unknown) calcPodStatus(newPod, taskStatusCount) klog.V(5).Infof("Created Task <%s> of Job <%s/%s>", pod.Name, job.Namespace, job.Name) } }(pod) } }(taskName, podToCreateEachTask) } waitCreationGroup.Wait() if len(creationErrs) != 0 { cc.recorder.Event(job, v1.EventTypeWarning, FailedCreatePodReason, fmt.Sprintf("Error creating pods: %+v", creationErrs)) return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate)) } // Delete pods when scale down. waitDeletionGroup := sync.WaitGroup{} waitDeletionGroup.Add(len(podToDelete)) for _, pod := range podToDelete { go func(pod *v1.Pod) { defer waitDeletionGroup.Done() err := cc.deleteJobPod(job.Name, pod) if err != nil { // Failed to delete Pod, waitCreationGroup a moment and then create it again // This is to ensure all podsMap under the same Job created // So gang-scheduling could schedule the Job successfully klog.Errorf("Failed to delete pod %s for Job %s, err %#v", pod.Name, job.Name, err) appendError(&deletionErrs, err) cc.resyncTask(pod) } else { klog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>", pod.Name, job.Namespace, job.Name) atomic.AddInt32(&terminating, 1) } }(pod) } waitDeletionGroup.Wait() if len(deletionErrs) != 0 { cc.recorder.Event(job, v1.EventTypeWarning, FailedDeletePodReason, fmt.Sprintf("Error deleting pods: %+v", deletionErrs)) return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete)) } newStatus := batch.JobStatus{ State: job.Status.State, Pending: pending, Running: running, Succeeded: succeeded, Failed: failed, Terminating: terminating, Unknown: unknown, Version: job.Status.Version, MinAvailable: job.Spec.MinAvailable, TaskStatusCount: taskStatusCount, ControlledResources: job.Status.ControlledResources, Conditions: job.Status.Conditions, RetryCount: job.Status.RetryCount, } if updateStatus != nil { updateStatus(&newStatus) } if reflect.DeepEqual(job.Status, newStatus) { klog.V(3).Infof("Job <%s/%s> has not updated for no changing", job.Namespace, job.Name) return nil } job.Status = newStatus job.Status.State.LastTransitionTime = metav1.Now() jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime) job.Status.Conditions = append(job.Status.Conditions, jobCondition) newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{}) if err != nil { klog.Errorf("Failed to update status of Job %v/%v: %v", job.Namespace, job.Name, err) return err } if e := cc.cache.Update(newJob); e != nil { klog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v", newJob.Namespace, newJob.Name, e) return e } return nil } func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskIndex int, job *batch.Job) bool { if job.Spec.Tasks[taskIndex].DependsOn == nil { return true } dependsOn := *job.Spec.Tasks[taskIndex].DependsOn if len(dependsOn.Name) > 1 && dependsOn.Iteration == batch.IterationAny { // any ready to create task, return true for _, task := range dependsOn.Name { if cc.isDependsOnPodsReady(task, job) { return true } } // all not ready to skip create task, return false return false } for _, dependsOnTask := range dependsOn.Name { // any not ready to skip create task, return false if !cc.isDependsOnPodsReady(dependsOnTask, job) { return false } } // all ready to create task, return true return true } func (cc *jobcontroller) isDependsOnPodsReady(task string, job *batch.Job) bool { dependsOnPods := jobhelpers.GetPodsNameUnderTask(task, job) dependsOnTaskIndex := jobhelpers.GetTaskIndexUnderJob(task, job) runningPodCount := 0 for _, podName := range dependsOnPods { pod, err := cc.podLister.Pods(job.Namespace).Get(podName) if err != nil { // If pod is not found. There are 2 possibilities. // 1. vcjob has been deleted. This function should return true. // 2. pod is not created. This function should return false, continue waiting. if apierrors.IsNotFound(err) { _, errGetJob := cc.jobLister.Jobs(job.Namespace).Get(job.Name) if errGetJob != nil { return apierrors.IsNotFound(errGetJob) } } klog.Errorf("Failed to get pod %v/%v %v", job.Namespace, podName, err) continue } if pod.Status.Phase != v1.PodRunning && pod.Status.Phase != v1.PodSucceeded { klog.V(5).Infof("Sequential state, pod %v/%v of depends on tasks is not running", pod.Namespace, pod.Name) continue } allContainerReady := true for _, containerStatus := range pod.Status.ContainerStatuses { if !containerStatus.Ready { allContainerReady = false break } } if allContainerReady { runningPodCount++ } } dependsOnTaskMinReplicas := job.Spec.Tasks[dependsOnTaskIndex].MinAvailable if dependsOnTaskMinReplicas != nil { if runningPodCount < int(*dependsOnTaskMinReplicas) { klog.V(5).Infof("In a depends on startup state, there are already %d pods running, which is less than the minimum number of runs", runningPodCount) return false } } return true } func (cc *jobcontroller) createJobIOIfNotExist(job *batch.Job) (*batch.Job, error) { // If PVC does not exist, create them for Job. var needUpdate bool if job.Status.ControlledResources == nil { job.Status.ControlledResources = make(map[string]string) } for index, volume := range job.Spec.Volumes { vcName := volume.VolumeClaimName if len(vcName) == 0 { // NOTE(k82cn): Ensure never have duplicated generated names. for { vcName = jobhelpers.GenPVCName(job.Name) exist, err := cc.checkPVCExist(job, vcName) if err != nil { return job, err } if exist { continue } job.Spec.Volumes[index].VolumeClaimName = vcName needUpdate = true break } // TODO: check VolumeClaim must be set if VolumeClaimName is empty if volume.VolumeClaim != nil { if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil { return job, err } } } else { exist, err := cc.checkPVCExist(job, vcName) if err != nil { return job, err } if !exist { return job, fmt.Errorf("pvc %s is not found, the job will be in the Pending state until the PVC is created", vcName) } } job.Status.ControlledResources["volume-pvc-"+vcName] = vcName } if needUpdate { newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{}) if err != nil { klog.Errorf("Failed to update Job %v/%v for volume claim name: %v ", job.Namespace, job.Name, err) return job, err } newJob.Status = job.Status return newJob, err } return job, nil } func (cc *jobcontroller) checkPVCExist(job *batch.Job, pvc string) (bool, error) { if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(pvc); err != nil { if apierrors.IsNotFound(err) { return false, nil } klog.V(3).Infof("Failed to get PVC %s for job <%s/%s>: %v", pvc, job.Namespace, job.Name, err) return false, err } return true, nil } func (cc *jobcontroller) createPVC(job *batch.Job, vcName string, volumeClaim *v1.PersistentVolumeClaimSpec) error { pvc := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Namespace: job.Namespace, Name: vcName, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(job, helpers.JobKind), }, }, Spec: *volumeClaim, } klog.V(3).Infof("Try to create PVC: %v", pvc) if _, e := cc.kubeClient.CoreV1().PersistentVolumeClaims(job.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{}); e != nil { klog.V(3).Infof("Failed to create PVC for Job <%s/%s>: %v", job.Namespace, job.Name, e) return e } return nil } func (cc *jobcontroller) createOrUpdatePodGroup(job *batch.Job) error { // If PodGroup does not exist, create one for Job. pg, err := cc.getPodGroupByJob(job) if err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Failed to get PodGroup for Job <%s/%s>: %v", job.Namespace, job.Name, err) return err } minTaskMember := map[string]int32{} for _, task := range job.Spec.Tasks { if task.MinAvailable != nil { minTaskMember[task.Name] = *task.MinAvailable } else { minTaskMember[task.Name] = task.Replicas } } pg := &scheduling.PodGroup{ ObjectMeta: metav1.ObjectMeta{ Namespace: job.Namespace, // add job.UID into its name when create new PodGroup Name: cc.generateRelatedPodGroupName(job), Annotations: job.Annotations, Labels: job.Labels, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(job, helpers.JobKind), }, }, Spec: scheduling.PodGroupSpec{ MinMember: job.Spec.MinAvailable, MinTaskMember: minTaskMember, Queue: job.Spec.Queue, MinResources: cc.calcPGMinResources(job), PriorityClassName: job.Spec.PriorityClassName, }, } if job.Spec.NetworkTopology != nil { nt := &scheduling.NetworkTopologySpec{ Mode: scheduling.NetworkTopologyMode(job.Spec.NetworkTopology.Mode), } if job.Spec.NetworkTopology.HighestTierAllowed != nil { nt.HighestTierAllowed = job.Spec.NetworkTopology.HighestTierAllowed } pg.Spec.NetworkTopology = nt } if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Create(context.TODO(), pg, metav1.CreateOptions{}); err != nil { if !apierrors.IsAlreadyExists(err) { klog.Errorf("Failed to create PodGroup for Job <%s/%s>: %v", job.Namespace, job.Name, err) return err } } return nil } pgShouldUpdate := false if pg.Spec.PriorityClassName != job.Spec.PriorityClassName { pg.Spec.PriorityClassName = job.Spec.PriorityClassName pgShouldUpdate = true } minResources := cc.calcPGMinResources(job) if pg.Spec.MinMember != job.Spec.MinAvailable || !equality.Semantic.DeepEqual(pg.Spec.MinResources, minResources) { pg.Spec.MinMember = job.Spec.MinAvailable pg.Spec.MinResources = minResources pgShouldUpdate = true } if pg.Spec.MinTaskMember == nil { pgShouldUpdate = true pg.Spec.MinTaskMember = make(map[string]int32) } for _, task := range job.Spec.Tasks { cnt := task.Replicas if task.MinAvailable != nil { cnt = *task.MinAvailable } if taskMember, ok := pg.Spec.MinTaskMember[task.Name]; !ok { pgShouldUpdate = true pg.Spec.MinTaskMember[task.Name] = cnt } else { if taskMember == cnt { continue } pgShouldUpdate = true pg.Spec.MinTaskMember[task.Name] = cnt } } if !pgShouldUpdate { return nil } _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Update(context.TODO(), pg, metav1.UpdateOptions{}) if err != nil { klog.V(3).Infof("Failed to update PodGroup for Job <%s/%s>: %v", job.Namespace, job.Name, err) } return err } func (cc *jobcontroller) deleteJobPod(jobName string, pod *v1.Pod) error { err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { klog.Errorf("Failed to delete pod %s/%s for Job %s, err %#v", pod.Namespace, pod.Name, jobName, err) return fmt.Errorf("failed to delete pod %s, err %#v", pod.Name, err) } return nil } func (cc *jobcontroller) calcPGMinResources(job *batch.Job) *v1.ResourceList { // sort task by priorityClasses var tasksPriority TasksPriority totalMinAvailable := int32(0) for _, task := range job.Spec.Tasks { tp := TaskPriority{0, task} pc := task.Template.Spec.PriorityClassName if pc != "" { priorityClass, err := cc.pcLister.Get(pc) if err != nil || priorityClass == nil { klog.Warningf("Ignore task %s priority class %s: %v", task.Name, pc, err) } else { tp.priority = priorityClass.Value } } tasksPriority = append(tasksPriority, tp) if task.MinAvailable != nil { // actually, it can not be nil, because nil value will be patched in webhook totalMinAvailable += *task.MinAvailable } else { totalMinAvailable += task.Replicas } } // see docs https://github.com/volcano-sh/volcano/pull/2945 // 1. job.MinAvailable < sum(task.MinAvailable), regard podgroup's min resource as sum of the first minAvailable, // according to https://github.com/volcano-sh/volcano/blob/c91eb07f2c300e4d5c826ff11a63b91781b3ac11/pkg/scheduler/api/job_info.go#L738-L740 if job.Spec.MinAvailable < totalMinAvailable { minReq := tasksPriority.CalcFirstCountResources(job.Spec.MinAvailable) return &minReq } // 2. job.MinAvailable >= sum(task.MinAvailable) minReq := tasksPriority.CalcPGMinResources(job.Spec.MinAvailable) return &minReq } func (cc *jobcontroller) initJobStatus(job *batch.Job) (*batch.Job, error) { if job.Status.State.Phase != "" { return job, nil } job.Status.State.Phase = batch.Pending job.Status.State.LastTransitionTime = metav1.Now() job.Status.MinAvailable = job.Spec.MinAvailable jobCondition := newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime) job.Status.Conditions = append(job.Status.Conditions, jobCondition) newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{}) if err != nil { klog.Errorf("Failed to update status of Job %v/%v: %v", job.Namespace, job.Name, err) return nil, err } if err := cc.cache.Update(newJob); err != nil { klog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v", newJob.Namespace, newJob.Name, err) return nil, err } return newJob, nil } func (cc *jobcontroller) recordPodGroupEvent(job *batch.Job, podGroup *scheduling.PodGroup) { var latestCondition *scheduling.PodGroupCondition // Get the latest condition by timestamp for _, condition := range podGroup.Status.Conditions { if condition.Status == v1.ConditionTrue { if latestCondition == nil || condition.LastTransitionTime.Time.After(latestCondition.LastTransitionTime.Time) { latestCondition = &condition } } } // If the latest condition is not scheduled, then a warning event is recorded if latestCondition != nil && latestCondition.Type != scheduling.PodGroupScheduled { cc.recorder.Eventf(job, v1.EventTypeWarning, string(batch.PodGroupPending), fmt.Sprintf("PodGroup %s:%s %s, reason: %s", job.Namespace, job.Name, strings.ToLower(string(latestCondition.Type)), latestCondition.Message)) } } func classifyAndAddUpPodBaseOnPhase(pod *v1.Pod, pending, running, succeeded, failed, unknown *int32) { switch pod.Status.Phase { case v1.PodPending: atomic.AddInt32(pending, 1) case v1.PodRunning: atomic.AddInt32(running, 1) case v1.PodSucceeded: atomic.AddInt32(succeeded, 1) case v1.PodFailed: atomic.AddInt32(failed, 1) default: atomic.AddInt32(unknown, 1) } } func calcPodStatus(pod *v1.Pod, taskStatusCount map[string]batch.TaskState) { taskName, found := pod.Annotations[batch.TaskSpecKey] if !found { return } calMutex.Lock() defer calMutex.Unlock() if _, ok := taskStatusCount[taskName]; !ok { taskStatusCount[taskName] = batch.TaskState{ Phase: make(map[v1.PodPhase]int32), } } switch pod.Status.Phase { case v1.PodPending: taskStatusCount[taskName].Phase[v1.PodPending]++ case v1.PodRunning: taskStatusCount[taskName].Phase[v1.PodRunning]++ case v1.PodSucceeded: taskStatusCount[taskName].Phase[v1.PodSucceeded]++ case v1.PodFailed: taskStatusCount[taskName].Phase[v1.PodFailed]++ default: taskStatusCount[taskName].Phase[v1.PodUnknown]++ } } func isInitiated(job *batch.Job) bool { if job.Status.State.Phase == "" || job.Status.State.Phase == batch.Pending { return false } return true } func newCondition(status batch.JobPhase, lastTransitionTime *metav1.Time) batch.JobCondition { return batch.JobCondition{ Status: status, LastTransitionTime: lastTransitionTime, } }
/* Copyright 2017 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package job import ( "context" "fmt" "strconv" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" batch "volcano.sh/apis/pkg/apis/batch/v1alpha1" bus "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/apis/pkg/apis/helpers" scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/controllers/apis" jobcache "volcano.sh/volcano/pkg/controllers/cache" jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers" "volcano.sh/volcano/pkg/controllers/job/state" ) func (cc *jobcontroller) addCommand(obj interface{}) { cmd, ok := obj.(*bus.Command) if !ok { klog.Errorf("obj is not Command") return } cc.commandQueue.Add(cmd) } func (cc *jobcontroller) addJob(obj interface{}) { job, ok := obj.(*batch.Job) if !ok { klog.Errorf("obj is not Job") return } req := apis.Request{ Namespace: job.Namespace, JobName: job.Name, Event: bus.OutOfSyncEvent, } // TODO(k82cn): if failed to add job, the cache should be refresh if err := cc.cache.Add(job); err != nil { klog.Errorf("Failed to add job <%s/%s>: %v in cache", job.Namespace, job.Name, err) } key := jobhelpers.GetJobKeyByReq(&req) queue := cc.getWorkerQueue(key) queue.Add(req) } func (cc *jobcontroller) updateJob(oldObj, newObj interface{}) { newJob, ok := newObj.(*batch.Job) if !ok { klog.Errorf("newObj is not Job") return } oldJob, ok := oldObj.(*batch.Job) if !ok { klog.Errorf("oldJob is not Job") return } // No need to update if ResourceVersion is not changed if newJob.ResourceVersion == oldJob.ResourceVersion { klog.V(6).Infof("No need to update because job is not modified.") return } if err := cc.cache.Update(newJob); err != nil { klog.Errorf("UpdateJob - Failed to update job <%s/%s>: %v in cache", newJob.Namespace, newJob.Name, err) } // NOTE: Since we only reconcile job based on Spec, we will ignore other attributes // For Job status, it's used internally and always been updated via our controller. if equality.Semantic.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase { klog.V(6).Infof("Job update event is ignored since no update in 'Spec'.") return } req := apis.Request{ Namespace: newJob.Namespace, JobName: newJob.Name, Event: bus.OutOfSyncEvent, } key := jobhelpers.GetJobKeyByReq(&req) queue := cc.getWorkerQueue(key) queue.Add(req) } func (cc *jobcontroller) deleteJob(obj interface{}) { job, ok := obj.(*batch.Job) if !ok { // If we reached here it means the Job was deleted but its final state is unrecorded. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { klog.Errorf("Couldn't get object from tombstone %#v", obj) return } job, ok = tombstone.Obj.(*batch.Job) if !ok { klog.Errorf("Tombstone contained object that is not a volcano Job: %#v", obj) return } } if err := cc.cache.Delete(job); err != nil { klog.Errorf("Failed to delete job <%s/%s>: %v in cache", job.Namespace, job.Name, err) } // Delete job metrics state.DeleteJobMetrics(fmt.Sprintf("%s/%s", job.Namespace, job.Name), job.Spec.Queue) } func (cc *jobcontroller) addPod(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { klog.Errorf("Failed to convert %v to v1.Pod", obj) return } var jobUid types.UID // Filter out pods that are not created from volcano job if !isControlledBy(pod, helpers.JobKind) { return } else { jobUid = metav1.GetControllerOf(pod).UID } jobName, found := pod.Annotations[batch.JobNameKey] if !found { klog.Infof("Failed to find jobName of Pod <%s/%s>, skipping", pod.Namespace, pod.Name) return } version, found := pod.Annotations[batch.JobVersion] if !found { klog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping", pod.Namespace, pod.Name) return } dVersion, err := strconv.ParseInt(version, 10, 32) if err != nil { klog.Infof("Failed to convert jobVersion of Pod <%s/%s> into number, skipping", pod.Namespace, pod.Name) return } if pod.DeletionTimestamp != nil { cc.deletePod(pod) return } req := apis.Request{ Namespace: pod.Namespace, JobName: jobName, JobUid: jobUid, PodName: pod.Name, PodUID: pod.UID, Event: bus.PodPendingEvent, JobVersion: int32(dVersion), } if err := cc.cache.AddPod(pod); err != nil { klog.Errorf("Failed to add Pod <%s/%s>: %v to cache", pod.Namespace, pod.Name, err) } key := jobhelpers.GetJobKeyByReq(&req) queue := cc.getWorkerQueue(key) queue.Add(req) } func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) { oldPod, ok := oldObj.(*v1.Pod) if !ok { klog.Errorf("Failed to convert %v to v1.Pod", oldObj) return } newPod, ok := newObj.(*v1.Pod) if !ok { klog.Errorf("Failed to convert %v to v1.Pod", newObj) return } var jobUid types.UID // Filter out pods that are not created from volcano job if !isControlledBy(newPod, helpers.JobKind) { return } else { jobUid = metav1.GetControllerOf(newPod).UID } if newPod.ResourceVersion == oldPod.ResourceVersion { return } if newPod.DeletionTimestamp != nil { cc.deletePod(newObj) return } taskName, found := newPod.Annotations[batch.TaskSpecKey] if !found { klog.Infof("Failed to find taskName of Pod <%s/%s>, skipping", newPod.Namespace, newPod.Name) return } jobName, found := newPod.Annotations[batch.JobNameKey] if !found { klog.Infof("Failed to find jobName of Pod <%s/%s>, skipping", newPod.Namespace, newPod.Name) return } version, found := newPod.Annotations[batch.JobVersion] if !found { klog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping", newPod.Namespace, newPod.Name) return } dVersion, err := strconv.ParseInt(version, 10, 32) if err != nil { klog.Infof("Failed to convert jobVersion of Pod into number <%s/%s>, skipping", newPod.Namespace, newPod.Name) return } if err := cc.cache.UpdatePod(newPod); err != nil { klog.Errorf("Failed to update Pod <%s/%s>: %v in cache", newPod.Namespace, newPod.Name, err) } event := bus.OutOfSyncEvent var exitCode int32 switch newPod.Status.Phase { case v1.PodFailed: if oldPod.Status.Phase != v1.PodFailed { event = bus.PodFailedEvent // TODO: currently only one container pod is supported by volcano // Once multi containers pod is supported, update accordingly. if len(newPod.Status.ContainerStatuses) > 0 && newPod.Status.ContainerStatuses[0].State.Terminated != nil { exitCode = newPod.Status.ContainerStatuses[0].State.Terminated.ExitCode } } case v1.PodSucceeded: if oldPod.Status.Phase != v1.PodSucceeded && cc.cache.TaskCompleted(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) { event = bus.TaskCompletedEvent } case v1.PodRunning: if cc.cache.TaskFailed(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) { event = bus.TaskFailedEvent } if oldPod.Status.Phase != v1.PodRunning { event = bus.PodRunningEvent } case v1.PodPending: if cc.cache.TaskFailed(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) { event = bus.TaskFailedEvent } if oldPod.Status.Phase != v1.PodPending { event = bus.PodPendingEvent } } req := apis.Request{ Namespace: newPod.Namespace, JobName: jobName, JobUid: jobUid, TaskName: taskName, PodName: newPod.Name, PodUID: newPod.UID, Event: event, ExitCode: exitCode, JobVersion: int32(dVersion), } key := jobhelpers.GetJobKeyByReq(&req) queue := cc.getWorkerQueue(key) queue.Add(req) } func (cc *jobcontroller) deletePod(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { // If we reached here it means the pod was deleted but its final state is unrecorded. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { klog.Errorf("Couldn't get object from tombstone %#v", obj) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { klog.Errorf("Tombstone contained object that is not a Pod: %#v", obj) return } } var jobUid types.UID // Filter out pods that are not created from volcano job if !isControlledBy(pod, helpers.JobKind) { return } else { jobUid = metav1.GetControllerOf(pod).UID } taskName, found := pod.Annotations[batch.TaskSpecKey] if !found { klog.Infof("Failed to find taskName of Pod <%s/%s>, skipping", pod.Namespace, pod.Name) return } jobName, found := pod.Annotations[batch.JobNameKey] if !found { klog.Infof("Failed to find jobName of Pod <%s/%s>, skipping", pod.Namespace, pod.Name) return } version, found := pod.Annotations[batch.JobVersion] if !found { klog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping", pod.Namespace, pod.Name) return } dVersion, err := strconv.ParseInt(version, 10, 32) if err != nil { klog.Infof("Failed to convert jobVersion of Pod <%s/%s> into number, skipping", pod.Namespace, pod.Name) return } req := apis.Request{ Namespace: pod.Namespace, JobName: jobName, JobUid: jobUid, TaskName: taskName, PodName: pod.Name, PodUID: pod.UID, Event: bus.PodEvictedEvent, JobVersion: int32(dVersion), } if err := cc.cache.DeletePod(pod); err != nil { klog.Errorf("Failed to delete Pod <%s/%s>: %v in cache", pod.Namespace, pod.Name, err) } key := jobhelpers.GetJobKeyByReq(&req) queue := cc.getWorkerQueue(key) queue.Add(req) } func (cc *jobcontroller) recordJobEvent(namespace, name string, event batch.JobEvent, message string) { job, err := cc.cache.Get(jobcache.JobKeyByName(namespace, name)) if err != nil { klog.Warningf("Failed to find job in cache when reporting job event <%s/%s>: %v", namespace, name, err) return } cc.recorder.Event(job.Job, v1.EventTypeNormal, string(event), message) } func (cc *jobcontroller) handleCommands() { for cc.processNextCommand() { } } func (cc *jobcontroller) processNextCommand() bool { obj, shutdown := cc.commandQueue.Get() if shutdown { return false } cmd := obj.(*bus.Command) defer cc.commandQueue.Done(cmd) if err := cc.vcClient.BusV1alpha1().Commands(cmd.Namespace).Delete(context.TODO(), cmd.Name, metav1.DeleteOptions{}); err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name) cc.commandQueue.AddRateLimited(cmd) } return true } cc.recordJobEvent(cmd.Namespace, cmd.TargetObject.Name, batch.CommandIssued, fmt.Sprintf( "Start to execute command %s, and clean it up to make sure executed not more than once.", cmd.Action)) req := apis.Request{ Namespace: cmd.Namespace, JobName: cmd.TargetObject.Name, Event: bus.CommandIssuedEvent, Action: bus.Action(cmd.Action), } key := jobhelpers.GetJobKeyByReq(&req) queue := cc.getWorkerQueue(key) queue.Add(req) return true } func (cc *jobcontroller) updatePodGroup(oldObj, newObj interface{}) { oldPG, ok := oldObj.(*scheduling.PodGroup) if !ok { klog.Errorf("Failed to convert %v to PodGroup", newObj) return } newPG, ok := newObj.(*scheduling.PodGroup) if !ok { klog.Errorf("Failed to convert %v to PodGroup", newObj) return } jobNameKey := newPG.Name ors := newPG.OwnerReferences for _, or := range ors { if or.Kind == "Job" { jobNameKey = or.Name } } _, err := cc.cache.Get(jobcache.JobKeyByName(newPG.Namespace, jobNameKey)) if err != nil && newPG.Annotations != nil { klog.Warningf( "Failed to find job in cache by PodGroup(%s/%s), this may not be a PodGroup for volcano job.", newPG.Namespace, newPG.Name) } if newPG.Status.Phase != oldPG.Status.Phase { req := apis.Request{ Namespace: newPG.Namespace, JobName: jobNameKey, } switch newPG.Status.Phase { case scheduling.PodGroupUnknown: req.Event = bus.JobUnknownEvent } key := jobhelpers.GetJobKeyByReq(&req) queue := cc.getWorkerQueue(key) queue.Add(req) } } // TODO(k82cn): add handler for PodGroup unschedulable event.
/* Copyright 2019 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package job import ( "fmt" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" batch "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/controllers/job/plugins" pluginsinterface "volcano.sh/volcano/pkg/controllers/job/plugins/interface" ) func (cc *jobcontroller) pluginOnPodCreate(job *batch.Job, pod *v1.Pod) error { client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient} for name, args := range job.Spec.Plugins { pb, found := plugins.GetPluginBuilder(name) if !found { err := fmt.Errorf("failed to get plugin %s", name) klog.Error(err) return err } klog.Infof("Starting to execute plugin at <pluginOnPodCreate>: %s on job: <%s/%s>", name, job.Namespace, job.Name) if err := pb(client, args).OnPodCreate(pod, job); err != nil { klog.Errorf("Failed to process on pod create plugin %s, err %v.", name, err) return err } } return nil } func (cc *jobcontroller) pluginOnJobAdd(job *batch.Job) error { client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient} if job.Status.ControlledResources == nil { job.Status.ControlledResources = make(map[string]string) } for name, args := range job.Spec.Plugins { pb, found := plugins.GetPluginBuilder(name) if !found { err := fmt.Errorf("failed to get plugin %s", name) klog.Error(err) return err } klog.Infof("Starting to execute plugin at <pluginOnJobAdd>: %s on job: <%s/%s>", name, job.Namespace, job.Name) if err := pb(client, args).OnJobAdd(job); err != nil { klog.Errorf("Failed to process on job add plugin %s, err %v.", name, err) return err } } return nil } func (cc *jobcontroller) pluginOnJobDelete(job *batch.Job) error { if job.Status.ControlledResources == nil { job.Status.ControlledResources = make(map[string]string) } client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient} for name, args := range job.Spec.Plugins { pb, found := plugins.GetPluginBuilder(name) if !found { err := fmt.Errorf("failed to get plugin %s", name) klog.Error(err) return err } klog.Infof("Starting to execute plugin at <pluginOnJobDelete>: %s on job: <%s/%s>", name, job.Namespace, job.Name) if err := pb(client, args).OnJobDelete(job); err != nil { klog.Errorf("failed to process on job delete plugin %s, err %v.", name, err) return err } } return nil } func (cc *jobcontroller) pluginOnJobUpdate(job *batch.Job) error { client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient} if job.Status.ControlledResources == nil { job.Status.ControlledResources = make(map[string]string) } for name, args := range job.Spec.Plugins { pb, found := plugins.GetPluginBuilder(name) if !found { err := fmt.Errorf("failed to get plugin %s", name) klog.Error(err) return err } klog.Infof("Starting to execute plugin at <pluginOnJobUpdate>: %s on job: <%s/%s>", name, job.Namespace, job.Name) if err := pb(client, args).OnJobUpdate(job); err != nil { klog.Errorf("Failed to process on job update plugin %s, err %v.", name, err) return err } } return nil }
/* Copyright 2019 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package job import ( "context" "fmt" "time" "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" ) func newRateLimitingQueue() workqueue.TypedRateLimitingInterface[any] { return workqueue.NewTypedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter[any]( workqueue.NewTypedItemExponentialFailureRateLimiter[any](5*time.Millisecond, 180*time.Second), // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) &workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, )) } func (cc *jobcontroller) processResyncTask() { obj, shutdown := cc.errTasks.Get() if shutdown { return } defer cc.errTasks.Done(obj) // one task only resync 10 times if cc.errTasks.NumRequeues(obj) > 10 { cc.errTasks.Forget(obj) return } task, ok := obj.(*v1.Pod) if !ok { klog.Errorf("failed to convert %v to *v1.Pod", obj) return } if err := cc.syncTask(task); err != nil { klog.Errorf("Failed to sync pod <%v/%v>, retry it, err %v", task.Namespace, task.Name, err) cc.resyncTask(task) } } func (cc *jobcontroller) syncTask(oldTask *v1.Pod) error { newPod, err := cc.kubeClient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { if err := cc.cache.DeletePod(oldTask); err != nil { klog.Errorf("failed to delete cache pod <%v/%v>, err %v.", oldTask.Namespace, oldTask.Name, err) return err } klog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldTask.Namespace, oldTask.Name) return nil } return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err) } return cc.cache.UpdatePod(newPod) } func (cc *jobcontroller) resyncTask(task *v1.Pod) { cc.errTasks.AddRateLimited(task) }
/* Copyright 2017 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package job import ( "fmt" "sort" "strconv" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" quotav1 "k8s.io/apiserver/pkg/quota/v1" "k8s.io/klog/v2" batch "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/apis/pkg/apis/helpers" schedulingv2 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/controllers/apis" jobcache "volcano.sh/volcano/pkg/controllers/cache" jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers" "volcano.sh/volcano/pkg/controllers/job/state" "volcano.sh/volcano/pkg/controllers/util" ) // MakePodName append podname,jobname,taskName and index and returns the string. func MakePodName(jobName string, taskName string, index int) string { return fmt.Sprintf(jobhelpers.PodNameFmt, jobName, taskName, index) } func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy batch.NumaPolicy, ix int, jobForwarding bool) *v1.Pod { templateCopy := template.DeepCopy() pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: jobhelpers.MakePodName(job.Name, template.Name, ix), Namespace: job.Namespace, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(job, helpers.JobKind), }, Labels: templateCopy.Labels, Annotations: templateCopy.Annotations, }, Spec: templateCopy.Spec, } // If no scheduler name in Pod, use scheduler name from Job. if len(pod.Spec.SchedulerName) == 0 { pod.Spec.SchedulerName = job.Spec.SchedulerName } // If no priority class specified in pod template, use priority class specified in job if len(pod.Spec.PriorityClassName) == 0 && len(job.Spec.PriorityClassName) != 0 { pod.Spec.PriorityClassName = job.Spec.PriorityClassName } volumeMap := make(map[string]string) for _, volume := range job.Spec.Volumes { vcName := volume.VolumeClaimName name := fmt.Sprintf("%s-%s", job.Name, jobhelpers.GenRandomStr(12)) if _, ok := volumeMap[vcName]; !ok { volume := v1.Volume{ Name: name, VolumeSource: v1.VolumeSource{ PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ ClaimName: vcName, }, }, } pod.Spec.Volumes = append(pod.Spec.Volumes, volume) volumeMap[vcName] = name } else { // duplicate volumes, should be prevented continue } for i, c := range pod.Spec.Containers { vm := v1.VolumeMount{ MountPath: volume.MountPath, Name: name, } pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm) } } tsKey := templateCopy.Name if len(tsKey) == 0 { tsKey = batch.DefaultTaskSpec } if len(pod.Annotations) == 0 { pod.Annotations = make(map[string]string) } index := strconv.Itoa(ix) pod.Annotations[batch.TaskIndex] = index pod.Annotations[batch.TaskSpecKey] = tsKey pgName := job.Name + "-" + string(job.UID) pod.Annotations[schedulingv2.KubeGroupNameAnnotationKey] = pgName pod.Annotations[batch.JobNameKey] = job.Name pod.Annotations[batch.QueueNameKey] = job.Spec.Queue pod.Annotations[batch.JobVersion] = fmt.Sprintf("%d", job.Status.Version) pod.Annotations[batch.PodTemplateKey] = fmt.Sprintf("%s-%s", job.Name, template.Name) pod.Annotations[batch.JobRetryCountKey] = strconv.Itoa(int(job.Status.RetryCount)) if topologyPolicy != "" { pod.Annotations[schedulingv2.NumaPolicyKey] = string(topologyPolicy) } if len(job.Annotations) > 0 { if value, found := job.Annotations[schedulingv2.PodPreemptable]; found { pod.Annotations[schedulingv2.PodPreemptable] = value } if value, found := job.Annotations[schedulingv2.CooldownTime]; found { pod.Annotations[schedulingv2.CooldownTime] = value } if value, found := job.Annotations[schedulingv2.RevocableZone]; found { pod.Annotations[schedulingv2.RevocableZone] = value } if value, found := job.Annotations[schedulingv2.JDBMinAvailable]; found { pod.Annotations[schedulingv2.JDBMinAvailable] = value } else if value, found := job.Annotations[schedulingv2.JDBMaxUnavailable]; found { pod.Annotations[schedulingv2.JDBMaxUnavailable] = value } } if len(pod.Labels) == 0 { pod.Labels = make(map[string]string) } // Set pod labels for Service. pod.Labels[batch.TaskIndex] = index pod.Labels[batch.JobNameKey] = job.Name pod.Labels[batch.TaskSpecKey] = tsKey pod.Labels[batch.JobNamespaceKey] = job.Namespace pod.Labels[batch.QueueNameKey] = job.Spec.Queue if len(job.Labels) > 0 { if value, found := job.Labels[schedulingv2.PodPreemptable]; found { pod.Labels[schedulingv2.PodPreemptable] = value } if value, found := job.Labels[schedulingv2.CooldownTime]; found { pod.Labels[schedulingv2.CooldownTime] = value } } if jobForwarding { pod.Annotations[batch.JobForwardingKey] = "true" pod.Labels[batch.JobForwardingKey] = "true" } return pod } func applyPolicies(job *batch.Job, req *apis.Request) (delayAct *delayAction) { delayAct = &delayAction{ jobKey: jobcache.JobKeyByReq(req), event: req.Event, taskName: req.TaskName, podName: req.PodName, podUID: req.PodUID, // default action is sync job action: v1alpha1.SyncJobAction, } if len(req.Action) != 0 { delayAct.action = req.Action return } // If the event is an internal event, we do not need to perform any action if isInternalEvent(req.Event) { return } // Solve the scenario: When pod events accumulate and vcjobs with the same name are frequently created, // it is easy for the pod to cause abnormal status of the newly created vcjob with the same name. if len(req.JobUid) != 0 && job != nil && req.JobUid != job.UID { klog.V(2).Infof("The req belongs to job(%s/%s) and job uid is %v, but the uid of job(%s/%s) is %v in cache, perform %v action", req.Namespace, req.JobName, req.JobUid, job.Namespace, job.Name, job.UID, v1alpha1.SyncJobAction) return } // For all the requests triggered from discarded job resources will perform sync action instead if req.JobVersion < job.Status.Version { klog.Infof("Request %s is outdated, will perform sync instead.", req) return } // Overwrite Job level policies if len(req.TaskName) != 0 { // Parse task level policies for _, task := range job.Spec.Tasks { if task.Name == req.TaskName { for _, policy := range task.Policies { policyEvents := getEventlist(policy) if len(policyEvents) > 0 && len(req.Event) > 0 { if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) { // Check if the event requires a timeout configuration, and whether a timeout policy is specified. // If the event does not require a timeout (shouldConfigureTimeout returns false), // or if a timeout policy is already set (policy.Timeout != nil), // execute the corresponding delay action and set the delay time based on the policy's Timeout.Duration. // If a timeout policy is specified, set the delay to the timeout duration. if !shouldConfigureTimeout(req.Event) || policy.Timeout != nil { delayAct.action = policy.Action if policy.Timeout != nil { delayAct.delay = policy.Timeout.Duration } return } } } // 0 is not an error code, is prevented in validation admission controller if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode { delayAct.action = policy.Action if policy.Timeout != nil { delayAct.delay = policy.Timeout.Duration } return } } break } } } // Parse Job level policies for _, policy := range job.Spec.Policies { policyEvents := getEventlist(policy) if len(policyEvents) > 0 && len(req.Event) > 0 { if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) { if !(shouldConfigureTimeout(req.Event) && policy.Timeout == nil) { delayAct.action = policy.Action if policy.Timeout != nil { delayAct.delay = policy.Timeout.Duration } return } } } // 0 is not an error code, is prevented in validation admission controller if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode { delayAct.action = policy.Action if policy.Timeout != nil { delayAct.delay = policy.Timeout.Duration } return } } return } func shouldConfigureTimeout(event v1alpha1.Event) bool { return event == v1alpha1.PodPendingEvent } func getEventlist(policy batch.LifecyclePolicy) []v1alpha1.Event { policyEventsList := policy.Events if len(policy.Event) > 0 { policyEventsList = append(policyEventsList, policy.Event) } return policyEventsList } func checkEventExist(policyEvents []v1alpha1.Event, reqEvent v1alpha1.Event) bool { for _, event := range policyEvents { if event == reqEvent { return true } } return false } // TaskPriority structure. type TaskPriority struct { priority int32 batch.TaskSpec } // TasksPriority is a slice of TaskPriority. type TasksPriority []TaskPriority func (p TasksPriority) Len() int { return len(p) } func (p TasksPriority) Less(i, j int) bool { return p[i].priority > p[j].priority } func (p TasksPriority) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func isControlledBy(obj metav1.Object, gvk schema.GroupVersionKind) bool { controllerRef := metav1.GetControllerOf(obj) if controllerRef == nil { return false } if controllerRef.APIVersion == gvk.GroupVersion().String() && controllerRef.Kind == gvk.Kind { return true } return false } // CalcFirstCountResources return the first count tasks resource, sorted by priority func (p TasksPriority) CalcFirstCountResources(count int32) v1.ResourceList { sort.Sort(p) minReq := v1.ResourceList{} for _, task := range p { if count <= task.Replicas { minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, count)) break } else { minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, task.Replicas)) count -= task.Replicas } } return minReq } // CalcPGMinResources sums up all task's min available; if not enough, then fill up to jobMinAvailable via task's replicas func (p TasksPriority) CalcPGMinResources(jobMinAvailable int32) v1.ResourceList { sort.Sort(p) minReq := v1.ResourceList{} podCnt := int32(0) // 1. first sum up those tasks whose MinAvailable is set for _, task := range p { if task.MinAvailable == nil { // actually, all task's min available is set by webhook continue } validReplics := *task.MinAvailable if left := jobMinAvailable - podCnt; left < validReplics { validReplics = left } minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, validReplics)) podCnt += validReplics if podCnt >= jobMinAvailable { break } } if podCnt >= jobMinAvailable { return minReq } // 2. fill up the count of pod to jobMinAvailable with tasks whose replicas is not used up, higher priority first leftCnt := jobMinAvailable - podCnt for _, task := range p { left := task.Replicas if task.MinAvailable != nil { if *task.MinAvailable == task.Replicas { continue } else { left = task.Replicas - *task.MinAvailable } } if leftCnt >= left { minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, left)) leftCnt -= left } else { minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, leftCnt)) leftCnt = 0 } if leftCnt <= 0 { break } } return minReq } // isInternalEvent checks if the event is an internal event func isInternalEvent(event v1alpha1.Event) bool { switch event { case v1alpha1.OutOfSyncEvent, v1alpha1.CommandIssuedEvent, v1alpha1.PodRunningEvent: return true default: return false } } // isInternalAction checks if the action is an internal action func isInternalAction(action v1alpha1.Action) bool { switch action { case v1alpha1.SyncJobAction, v1alpha1.EnqueueAction, v1alpha1.SyncQueueAction, v1alpha1.OpenQueueAction, v1alpha1.CloseQueueAction: return true default: return false } } func GetStateAction(delayAct *delayAction) state.Action { action := state.Action{Action: delayAct.action} if delayAct.action == v1alpha1.RestartTaskAction { action.Target = state.Target{TaskName: delayAct.taskName, Type: state.TargetTypeTask} } else if delayAct.action == v1alpha1.RestartPodAction { action.Target = state.Target{TaskName: delayAct.taskName, PodName: delayAct.podName, Type: state.TargetTypePod} } return action } type ActionType int const ( JobAction ActionType = iota TaskAction PodAction ) func GetActionType(action v1alpha1.Action) ActionType { switch action { case v1alpha1.AbortJobAction, v1alpha1.RestartJobAction, v1alpha1.TerminateJobAction, v1alpha1.CompleteJobAction, v1alpha1.ResumeJobAction: return JobAction case v1alpha1.RestartTaskAction: return TaskAction case v1alpha1.RestartPodAction: return PodAction } return JobAction }