/*
Copyright 2017 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"context"
"fmt"
"hash/fnv"
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
kubeschedulinginformers "k8s.io/client-go/informers/scheduling/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
kubeschedulinglisters "k8s.io/client-go/listers/scheduling/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
batchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"
busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
vcscheme "volcano.sh/apis/pkg/client/clientset/versioned/scheme"
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
batchinformer "volcano.sh/apis/pkg/client/informers/externalversions/batch/v1alpha1"
businformer "volcano.sh/apis/pkg/client/informers/externalversions/bus/v1alpha1"
schedulinginformers "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
batchlister "volcano.sh/apis/pkg/client/listers/batch/v1alpha1"
buslister "volcano.sh/apis/pkg/client/listers/bus/v1alpha1"
schedulinglisters "volcano.sh/apis/pkg/client/listers/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
jobcache "volcano.sh/volcano/pkg/controllers/cache"
"volcano.sh/volcano/pkg/controllers/framework"
"volcano.sh/volcano/pkg/controllers/job/state"
"volcano.sh/volcano/pkg/features"
)
func init() {
framework.RegisterController(&jobcontroller{})
}
type delayAction struct {
// The namespacing name of the job
jobKey string
// The name of the task
taskName string
// The name of the pod
podName string
// The UID of the pod
podUID types.UID
// The event caused the action
event busv1alpha1.Event
// The action to take.
action busv1alpha1.Action
// The delay before the action is executed
delay time.Duration
// The cancel function of the action
cancel context.CancelFunc
}
// jobcontroller the Job jobcontroller type.
type jobcontroller struct {
kubeClient kubernetes.Interface
vcClient vcclientset.Interface
jobInformer batchinformer.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer schedulinginformers.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer businformer.CommandInformer
pcInformer kubeschedulinginformers.PriorityClassInformer
queueInformer schedulinginformers.QueueInformer
informerFactory informers.SharedInformerFactory
vcInformerFactory vcinformer.SharedInformerFactory
// A store of jobs
jobLister batchlister.JobLister
jobSynced func() bool
// A store of pods
podLister corelisters.PodLister
podSynced func() bool
pvcLister corelisters.PersistentVolumeClaimLister
pvcSynced func() bool
// A store of podgroups
pgLister schedulinglisters.PodGroupLister
pgSynced func() bool
// A store of service
svcLister corelisters.ServiceLister
svcSynced func() bool
cmdLister buslister.CommandLister
cmdSynced func() bool
pcLister kubeschedulinglisters.PriorityClassLister
pcSynced func() bool
queueLister schedulinglisters.QueueLister
queueSynced func() bool
// queue that need to sync up
queueList []workqueue.TypedRateLimitingInterface[any]
commandQueue workqueue.TypedRateLimitingInterface[any]
cache jobcache.Cache
// Job Event recorder
recorder record.EventRecorder
errTasks workqueue.TypedRateLimitingInterface[any]
workers uint32
maxRequeueNum int
delayActionMapLock sync.RWMutex
// delayActionMap stores delayed actions for jobs, where outer map key is job key (namespace/name),
// inner map key is pod name, and value is the delayed action to be performed
delayActionMap map[string]map[string]*delayAction
}
func (cc *jobcontroller) Name() string {
return "job-controller"
}
// Initialize creates the new Job controller.
func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
cc.kubeClient = opt.KubeClient
cc.vcClient = opt.VolcanoClient
sharedInformers := opt.SharedInformerFactory
workers := opt.WorkerNum
// Initialize event client
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})
cc.informerFactory = sharedInformers
cc.queueList = make([]workqueue.TypedRateLimitingInterface[any], workers)
cc.commandQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
cc.cache = jobcache.New()
cc.errTasks = newRateLimitingQueue()
cc.recorder = recorder
cc.workers = workers
cc.maxRequeueNum = opt.MaxRequeueNum
if cc.maxRequeueNum < 0 {
cc.maxRequeueNum = -1
}
var i uint32
for i = 0; i < workers; i++ {
cc.queueList[i] = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
}
factory := opt.VCSharedInformerFactory
cc.vcInformerFactory = factory
if utilfeature.DefaultFeatureGate.Enabled(features.VolcanoJobSupport) {
cc.jobInformer = factory.Batch().V1alpha1().Jobs()
cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addJob,
UpdateFunc: cc.updateJob,
DeleteFunc: cc.deleteJob,
})
cc.jobLister = cc.jobInformer.Lister()
cc.jobSynced = cc.jobInformer.Informer().HasSynced
}
if utilfeature.DefaultFeatureGate.Enabled(features.QueueCommandSync) {
cc.cmdInformer = factory.Bus().V1alpha1().Commands()
cc.cmdInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch v := obj.(type) {
case *busv1alpha1.Command:
if v.TargetObject != nil &&
v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() &&
v.TargetObject.Kind == "Job" {
return true
}
return false
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: cc.addCommand,
},
},
)
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced
}
cc.podInformer = sharedInformers.Core().V1().Pods()
cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
UpdateFunc: cc.updatePod,
DeleteFunc: cc.deletePod,
})
cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced
cc.pvcInformer = sharedInformers.Core().V1().PersistentVolumeClaims()
cc.pvcLister = cc.pvcInformer.Lister()
cc.pvcSynced = cc.pvcInformer.Informer().HasSynced
cc.svcInformer = sharedInformers.Core().V1().Services()
cc.svcLister = cc.svcInformer.Lister()
cc.svcSynced = cc.svcInformer.Informer().HasSynced
cc.pgInformer = factory.Scheduling().V1beta1().PodGroups()
cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: cc.updatePodGroup,
})
cc.pgLister = cc.pgInformer.Lister()
cc.pgSynced = cc.pgInformer.Informer().HasSynced
if utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) {
cc.pcInformer = sharedInformers.Scheduling().V1().PriorityClasses()
cc.pcLister = cc.pcInformer.Lister()
cc.pcSynced = cc.pcInformer.Informer().HasSynced
}
cc.queueInformer = factory.Scheduling().V1beta1().Queues()
cc.queueLister = cc.queueInformer.Lister()
cc.queueSynced = cc.queueInformer.Informer().HasSynced
cc.delayActionMap = make(map[string]map[string]*delayAction)
// Register actions
state.SyncJob = cc.syncJob
state.KillJob = cc.killJob
state.KillTarget = cc.killTarget
return nil
}
// Run start JobController.
func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
cc.informerFactory.Start(stopCh)
cc.vcInformerFactory.Start(stopCh)
for informerType, ok := range cc.informerFactory.WaitForCacheSync(stopCh) {
if !ok {
klog.Errorf("caches failed to sync: %v", informerType)
return
}
}
for informerType, ok := range cc.vcInformerFactory.WaitForCacheSync(stopCh) {
if !ok {
klog.Errorf("caches failed to sync: %v", informerType)
return
}
}
go wait.Until(cc.handleCommands, 0, stopCh)
var i uint32
for i = 0; i < cc.workers; i++ {
go func(num uint32) {
wait.Until(
func() {
cc.worker(num)
},
time.Second,
stopCh)
}(i)
}
go cc.cache.Run(stopCh)
// Re-sync error tasks.
go wait.Until(cc.processResyncTask, 0, stopCh)
klog.Infof("JobController is running ...... ")
}
func (cc *jobcontroller) worker(i uint32) {
klog.Infof("worker %d start ...... ", i)
for cc.processNextReq(i) {
}
}
func (cc *jobcontroller) belongsToThisRoutine(key string, count uint32) bool {
val := cc.genHash(key)
return val%cc.workers == count
}
func (cc *jobcontroller) getWorkerQueue(key string) workqueue.TypedRateLimitingInterface[any] {
val := cc.genHash(key)
queue := cc.queueList[val%cc.workers]
return queue
}
func (cc *jobcontroller) genHash(key string) uint32 {
hashVal := fnv.New32()
hashVal.Write([]byte(key))
return hashVal.Sum32()
}
func (cc *jobcontroller) processNextReq(count uint32) bool {
queue := cc.queueList[count]
obj, shutdown := queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
return false
}
req := obj.(apis.Request)
defer queue.Done(req)
key := jobcache.JobKeyByReq(&req)
if !cc.belongsToThisRoutine(key, count) {
klog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
queueLocal := cc.getWorkerQueue(key)
queueLocal.Add(req)
return true
}
klog.V(3).Infof("Try to handle request <%v>", req)
cc.CleanPodDelayActionsIfNeed(req)
jobInfo, err := cc.cache.Get(key)
if err != nil {
// TODO(k82cn): ignore not-ready error.
klog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
return true
}
st := state.NewState(jobInfo)
if st == nil {
klog.Errorf("Invalid state <%s> of Job <%v/%v>",
jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
return true
}
delayAct := applyPolicies(jobInfo.Job, &req)
if delayAct.delay != 0 {
klog.V(3).Infof("Execute <%v> on Job <%s/%s> after %s",
delayAct.action, req.Namespace, req.JobName, delayAct.delay.String())
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Execute action %s after %s", delayAct.action, delayAct.delay.String()))
cc.AddDelayActionForJob(req, delayAct)
return true
}
klog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.",
delayAct.action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st)
if delayAct.action != busv1alpha1.SyncJobAction {
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Start to execute action %s ", delayAct.action))
}
action := GetStateAction(delayAct)
if err := st.Execute(action); err != nil {
cc.handleJobError(queue, req, st, err, delayAct.action)
return true
}
// If no error, forget it.
queue.Forget(req)
// If the action is not an internal action, cancel all delayed actions
if !isInternalAction(delayAct.action) {
cc.cleanupDelayActions(delayAct)
}
return true
}
// CleanPodDelayActionsIfNeed is used to clean delayed actions for Pod events when the pod phase changed:
// if the event is not PodPending event:
// - cancel corresponding Pod Pending delayed action
// - if the event is PodRunning state, cancel corresponding Pod Failed and Pod Evicted delayed actions
func (cc *jobcontroller) CleanPodDelayActionsIfNeed(req apis.Request) {
// Skip cleaning delayed actions for non-pod events
if !cc.isPodEvent(req) {
return
}
if req.Event != busv1alpha1.PodPendingEvent {
key := jobcache.JobKeyByReq(&req)
cc.delayActionMapLock.Lock()
defer cc.delayActionMapLock.Unlock()
if taskMap, exists := cc.delayActionMap[key]; exists {
if delayAct, exists := taskMap[req.PodName]; exists {
shouldCancel := false
if delayAct.event == busv1alpha1.PodPendingEvent {
// For PodPending delayed action, we need to check if the Pod UID matches
// Because if a Pod is deleted and immediately recreated,
// the new Pod's pending event may be queued before the old Pod's delete event
if req.PodUID == delayAct.podUID {
shouldCancel = true
}
}
if (delayAct.event == busv1alpha1.PodFailedEvent || delayAct.event == busv1alpha1.PodEvictedEvent) &&
req.Event == busv1alpha1.PodRunningEvent {
shouldCancel = true
}
if shouldCancel {
klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> because of event <%s> of Job <%s>", delayAct.action, req.PodName, req.Event, delayAct.jobKey)
delayAct.cancel()
delete(taskMap, req.PodName)
}
}
}
}
}
func (cc *jobcontroller) isPodEvent(req apis.Request) bool {
return req.Event == busv1alpha1.PodPendingEvent ||
req.Event == busv1alpha1.PodRunningEvent ||
req.Event == busv1alpha1.PodFailedEvent ||
req.Event == busv1alpha1.PodEvictedEvent
}
func (cc *jobcontroller) AddDelayActionForJob(req apis.Request, delayAct *delayAction) {
cc.delayActionMapLock.Lock()
defer cc.delayActionMapLock.Unlock()
m, ok := cc.delayActionMap[delayAct.jobKey]
if !ok {
m = make(map[string]*delayAction)
cc.delayActionMap[delayAct.jobKey] = m
}
if oldDelayAct, exists := m[req.PodName]; exists && oldDelayAct.action == delayAct.action {
return
}
m[req.PodName] = delayAct
ctx, cancel := context.WithTimeout(context.Background(), delayAct.delay)
delayAct.cancel = cancel
go func() {
<-ctx.Done()
if ctx.Err() == context.Canceled {
klog.V(4).Infof("Job<%s/%s>'s delayed action %s is canceled", req.Namespace, req.JobName, delayAct.action)
return
}
klog.V(4).Infof("Job<%s/%s>'s delayed action %s is expired, execute it", req.Namespace, req.JobName, delayAct.action)
jobInfo, err := cc.cache.Get(delayAct.jobKey)
if err != nil {
klog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
return
}
st := state.NewState(jobInfo)
if st == nil {
klog.Errorf("Invalid state <%s> of Job <%v/%v>",
jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
return
}
queue := cc.getWorkerQueue(delayAct.jobKey)
if err := st.Execute(GetStateAction(delayAct)); err != nil {
cc.handleJobError(queue, req, st, err, delayAct.action)
}
queue.Forget(req)
cc.cleanupDelayActions(delayAct)
}()
}
func (cc *jobcontroller) handleJobError(queue workqueue.TypedRateLimitingInterface[any], req apis.Request, st state.State, err error, action busv1alpha1.Action) {
if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum {
klog.V(2).Infof("Failed to handle Job <%s/%s>: %v",
req.Namespace, req.JobName, err)
queue.AddRateLimited(req)
return
}
cc.recordJobEvent(req.Namespace, req.JobName, batchv1alpha1.ExecuteAction,
fmt.Sprintf("Job failed on action %s for retry limit reached", action))
klog.Warningf("Terminating Job <%s/%s> and releasing resources", req.Namespace, req.JobName)
if err = st.Execute(state.Action{Action: busv1alpha1.TerminateJobAction}); err != nil {
klog.Errorf("Failed to terminate Job<%s/%s>: %v", req.Namespace, req.JobName, err)
}
klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached",
req.Namespace, req.JobName, err)
}
// cleanupDelayActions cleans up delayed actions
// After a delayed action is executed, other delayed actions of the same type need to be cleaned up to avoid duplicate execution
// Parameters:
// - currentDelayAction: the delayed action that has just been executed
//
// Implementation logic:
// 1. Get the type of current delayed action (Job level, Task level or Pod level)
// 2. Iterate through all delayed actions under this Job
// 3. If the delayed action type matches, cancel it and remove from the map
//
// Usage scenarios:
// - When a Pod failure triggers Job termination, need to cancel other Job delayed actions under this Job
func (cc *jobcontroller) cleanupDelayActions(currentDelayAction *delayAction) {
cc.delayActionMapLock.Lock()
defer cc.delayActionMapLock.Unlock()
actionType := GetActionType(currentDelayAction.action)
if m, exists := cc.delayActionMap[currentDelayAction.jobKey]; exists {
for _, delayAct := range m {
if GetActionType(delayAct.action) == actionType {
// For Task level actions, only cancel delayed actions for the same task
if actionType == TaskAction && delayAct.taskName != currentDelayAction.taskName {
continue
}
// For Pod level actions, only cancel delayed actions for the same pod
if actionType == PodAction && delayAct.podName != currentDelayAction.podName {
continue
}
if delayAct.cancel != nil {
klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> because of event <%s> and action <%s> of Job <%s>", delayAct.action, delayAct.podName, currentDelayAction.event, currentDelayAction.action, delayAct.jobKey)
delayAct.cancel()
}
delete(m, delayAct.podName)
}
}
}
}
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/helpers"
scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/job/state"
)
var calMutex sync.Mutex
// getPodGroupByJob returns the podgroup related to the vcjob.
// it will return normal pg if it exist in cluster,
// else it return legacy pg before version 1.5.
func (cc *jobcontroller) getPodGroupByJob(job *batch.Job) (*scheduling.PodGroup, error) {
pgName := cc.generateRelatedPodGroupName(job)
pg, err := cc.pgLister.PodGroups(job.Namespace).Get(pgName)
if err == nil {
return pg, nil
}
if apierrors.IsNotFound(err) {
pg, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name)
if err != nil {
return nil, err
}
return pg, nil
}
return nil, err
}
func (cc *jobcontroller) generateRelatedPodGroupName(job *batch.Job) string {
return fmt.Sprintf("%s-%s", job.Name, string(job.UID))
}
func (cc *jobcontroller) killTarget(jobInfo *apis.JobInfo, target state.Target, updateStatus state.UpdateStatusFn) error {
if target.Type == state.TargetTypeTask {
klog.V(3).Infof("Killing task <%s> of Job <%s/%s>, current version %d", target.TaskName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version)
defer klog.V(3).Infof("Finished task <%s> of Job <%s/%s> killing, current version %d", target.TaskName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version)
} else if target.Type == state.TargetTypePod {
klog.V(3).Infof("Killing pod <%s> of Job <%s/%s>, current version %d", target.PodName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version)
defer klog.V(3).Infof("Finished pod <%s> of Job <%s/%s> killing, current version %d", target.PodName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version)
}
return cc.killPods(jobInfo, nil, &target, updateStatus)
}
func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {
klog.V(3).Infof("Killing Job <%s/%s>, current version %d", jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version)
defer klog.V(3).Infof("Finished Job <%s/%s> killing, current version %d", jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version)
return cc.killPods(jobInfo, podRetainPhase, nil, updateStatus)
}
func (cc *jobcontroller) killPods(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, target *state.Target, updateStatus state.UpdateStatusFn) error {
job := jobInfo.Job
if job.DeletionTimestamp != nil {
klog.Infof("Job <%s/%s> is terminating, skip management process.",
job.Namespace, job.Name)
return nil
}
var pending, running, terminating, succeeded, failed, unknown int32
taskStatusCount := make(map[string]batch.TaskState)
var errs []error
var total int
podsToKill := make(map[string]*v1.Pod)
if target != nil {
if target.Type == state.TargetTypeTask {
if targetPods, found := jobInfo.Pods[target.TaskName]; found {
podsToKill = targetPods
}
} else if target.Type == state.TargetTypePod {
if targetPods, found := jobInfo.Pods[target.TaskName]; found {
if pod, found := targetPods[target.PodName]; found {
podsToKill[target.PodName] = pod
}
}
}
total += len(podsToKill)
} else {
// Job version is bumped only when job is killed
job.Status.Version++
for _, pods := range jobInfo.Pods {
for _, pod := range pods {
total++
if pod.DeletionTimestamp != nil {
klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name)
terminating++
continue
}
maxRetry := job.Spec.MaxRetry
lastRetry := false
if job.Status.RetryCount >= maxRetry-1 {
lastRetry = true
}
// Only retain the Failed and Succeeded pods at the last retry.
// If it is not the last retry, kill pod as defined in `podRetainPhase`.
retainPhase := podRetainPhase
if lastRetry {
retainPhase = state.PodRetainPhaseSoft
}
_, retain := retainPhase[pod.Status.Phase]
if !retain {
podsToKill[pod.Name] = pod
}
}
}
}
for _, pod := range podsToKill {
if pod.DeletionTimestamp != nil {
klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name)
terminating++
continue
}
err := cc.deleteJobPod(job.Name, pod)
if err == nil {
terminating++
continue
}
// record the err, and then collect the pod info like retained pod
errs = append(errs, err)
cc.resyncTask(pod)
classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)
calcPodStatus(pod, taskStatusCount)
}
if len(errs) != 0 {
klog.Errorf("failed to kill pods for job %s/%s, with err %+v", job.Namespace, job.Name, errs)
cc.recorder.Event(job, v1.EventTypeWarning, FailedDeletePodReason,
fmt.Sprintf("Error deleting pods: %+v", errs))
return fmt.Errorf("failed to kill %d pods of %d", len(errs), total)
}
job = job.DeepCopy()
job.Status.Pending = pending
job.Status.Running = running
job.Status.Succeeded = succeeded
job.Status.Failed = failed
job.Status.Terminating = terminating
job.Status.Unknown = unknown
job.Status.TaskStatusCount = taskStatusCount
if updateStatus != nil {
if updateStatus(&job.Status) {
job.Status.State.LastTransitionTime = metav1.Now()
jobCondition := newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
job.Status.Conditions = append(job.Status.Conditions, jobCondition)
}
}
// Update running duration
runningDuration := metav1.Duration{Duration: job.Status.State.LastTransitionTime.Sub(jobInfo.Job.CreationTimestamp.Time)}
klog.V(3).Infof("Running duration is %s", runningDuration.ToUnstructured())
job.Status.RunningDuration = &runningDuration
// must be called before update job status
if err := cc.pluginOnJobDelete(job); err != nil {
return err
}
// Update Job status
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
if apierrors.IsNotFound(err) {
klog.Errorf("Job %v/%v was not found", job.Namespace, job.Name)
return nil
}
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if e := cc.cache.Update(newJob); e != nil {
klog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, e)
return e
}
// Delete PodGroup
pg, err := cc.getPodGroupByJob(job)
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to find PodGroup of Job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
if pg != nil {
if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), pg.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete PodGroup of Job %s/%s: %v", job.Namespace, job.Name, err)
return err
}
}
}
// NOTE(k82cn): DO NOT delete input/output until job is deleted.
return nil
}
func (cc *jobcontroller) initiateJob(job *batch.Job) (*batch.Job, error) {
klog.V(3).Infof("Starting to initiate Job <%s/%s>", job.Namespace, job.Name)
jobInstance, err := cc.initJobStatus(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.JobStatusError),
fmt.Sprintf("Failed to initialize job status, err: %v", err))
return nil, err
}
if err := cc.pluginOnJobAdd(jobInstance); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError),
fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
return nil, err
}
newJob, err := cc.createJobIOIfNotExist(jobInstance)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PVCError),
fmt.Sprintf("Failed to create PVC, err: %v", err))
return nil, err
}
if err := cc.createOrUpdatePodGroup(newJob); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
return nil, err
}
return newJob, nil
}
func (cc *jobcontroller) initOnJobUpdate(job *batch.Job) error {
klog.V(3).Infof("Starting to initiate Job <%s/%s> on update", job.Namespace, job.Name)
if err := cc.pluginOnJobUpdate(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError),
fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
return err
}
if err := cc.createOrUpdatePodGroup(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
return err
}
return nil
}
func (cc *jobcontroller) GetQueueInfo(queue string) (*scheduling.Queue, error) {
queueInfo, err := cc.queueLister.Get(queue)
if err != nil {
klog.Errorf("Failed to get queue from queueLister, error: %s", err.Error())
}
return queueInfo, err
}
func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
job := jobInfo.Job
klog.V(3).Infof("Starting to sync up Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version)
defer klog.V(3).Infof("Finished Job <%s/%s> sync up, current version %d", job.Namespace, job.Name, job.Status.Version)
if jobInfo.Job.DeletionTimestamp != nil {
klog.Infof("Job <%s/%s> is terminating, skip management process.",
jobInfo.Job.Namespace, jobInfo.Job.Name)
return nil
}
// deep copy job to prevent mutate it
job = job.DeepCopy()
// Find queue that job belongs to, and check if the queue has forwarding metadata
queueInfo, err := cc.GetQueueInfo(job.Spec.Queue)
if err != nil {
return err
}
var jobForwarding bool
if len(queueInfo.Spec.ExtendClusters) != 0 {
jobForwarding = true
if len(job.Annotations) == 0 {
job.Annotations = make(map[string]string)
}
job.Annotations[batch.JobForwardingKey] = "true"
job, err = cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
}
// Skip job initiation if job is already initiated
if !isInitiated(job) {
if job, err = cc.initiateJob(job); err != nil {
return err
}
} else {
// TODO: optimize this call it only when scale up/down
if err = cc.initOnJobUpdate(job); err != nil {
return err
}
}
if len(queueInfo.Spec.ExtendClusters) != 0 {
jobForwarding = true
job.Annotations[batch.JobForwardingKey] = "true"
_, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
}
var syncTask bool
pg, err := cc.getPodGroupByJob(job)
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to find PodGroup of Job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
if pg != nil {
if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {
syncTask = true
}
cc.recordPodGroupEvent(job, pg)
}
var jobCondition batch.JobCondition
oldStatus := job.Status
if !syncTask {
if updateStatus != nil {
updateStatus(&job.Status)
}
if equality.Semantic.DeepEqual(job.Status, oldStatus) {
klog.V(4).Infof("Job <%s/%s> has not updated for no changing", job.Namespace, job.Name)
return nil
}
job.Status.State.LastTransitionTime = metav1.Now()
jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
job.Status.Conditions = append(job.Status.Conditions, jobCondition)
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if e := cc.cache.Update(newJob); e != nil {
klog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, e)
return e
}
return nil
}
var running, pending, terminating, succeeded, failed, unknown int32
taskStatusCount := make(map[string]batch.TaskState)
podToCreate := make(map[string][]*v1.Pod)
var podToDelete []*v1.Pod
var creationErrs []error
var deletionErrs []error
appendMutex := sync.Mutex{}
appendError := func(container *[]error, err error) {
appendMutex.Lock()
defer appendMutex.Unlock()
*container = append(*container, err)
}
waitCreationGroup := sync.WaitGroup{}
for _, ts := range job.Spec.Tasks {
ts.Template.Name = ts.Name
tc := ts.Template.DeepCopy()
name := ts.Template.Name
pods, found := jobInfo.Pods[name]
if !found {
pods = map[string]*v1.Pod{}
}
var podToCreateEachTask []*v1.Pod
for i := 0; i < int(ts.Replicas); i++ {
podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)
if pod, found := pods[podName]; !found {
newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding)
if err := cc.pluginOnPodCreate(job, newPod); err != nil {
return err
}
podToCreateEachTask = append(podToCreateEachTask, newPod)
waitCreationGroup.Add(1)
} else {
delete(pods, podName)
if pod.DeletionTimestamp != nil {
klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name)
atomic.AddInt32(&terminating, 1)
continue
}
classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)
calcPodStatus(pod, taskStatusCount)
}
}
podToCreate[ts.Name] = podToCreateEachTask
for _, pod := range pods {
podToDelete = append(podToDelete, pod)
}
}
for taskName, podToCreateEachTask := range podToCreate {
if len(podToCreateEachTask) == 0 {
continue
}
go func(taskName string, podToCreateEachTask []*v1.Pod) {
taskIndex := jobhelpers.GetTaskIndexUnderJob(taskName, job)
if job.Spec.Tasks[taskIndex].DependsOn != nil {
if !cc.waitDependsOnTaskMeetCondition(taskIndex, job) {
klog.V(3).Infof("Job %s/%s depends on task not ready", job.Name, job.Namespace)
// release wait group
for _, pod := range podToCreateEachTask {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
}(pod)
}
return
}
}
for _, pod := range podToCreateEachTask {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
klog.Errorf("Failed to create pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
appendError(&creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err))
} else {
classifyAndAddUpPodBaseOnPhase(newPod, &pending, &running, &succeeded, &failed, &unknown)
calcPodStatus(newPod, taskStatusCount)
klog.V(5).Infof("Created Task <%s> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
}(taskName, podToCreateEachTask)
}
waitCreationGroup.Wait()
if len(creationErrs) != 0 {
cc.recorder.Event(job, v1.EventTypeWarning, FailedCreatePodReason,
fmt.Sprintf("Error creating pods: %+v", creationErrs))
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
}
// Delete pods when scale down.
waitDeletionGroup := sync.WaitGroup{}
waitDeletionGroup.Add(len(podToDelete))
for _, pod := range podToDelete {
go func(pod *v1.Pod) {
defer waitDeletionGroup.Done()
err := cc.deleteJobPod(job.Name, pod)
if err != nil {
// Failed to delete Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
klog.Errorf("Failed to delete pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
appendError(&deletionErrs, err)
cc.resyncTask(pod)
} else {
klog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
atomic.AddInt32(&terminating, 1)
}
}(pod)
}
waitDeletionGroup.Wait()
if len(deletionErrs) != 0 {
cc.recorder.Event(job, v1.EventTypeWarning, FailedDeletePodReason,
fmt.Sprintf("Error deleting pods: %+v", deletionErrs))
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
}
newStatus := batch.JobStatus{
State: job.Status.State,
Pending: pending,
Running: running,
Succeeded: succeeded,
Failed: failed,
Terminating: terminating,
Unknown: unknown,
Version: job.Status.Version,
MinAvailable: job.Spec.MinAvailable,
TaskStatusCount: taskStatusCount,
ControlledResources: job.Status.ControlledResources,
Conditions: job.Status.Conditions,
RetryCount: job.Status.RetryCount,
}
if updateStatus != nil {
updateStatus(&newStatus)
}
if reflect.DeepEqual(job.Status, newStatus) {
klog.V(3).Infof("Job <%s/%s> has not updated for no changing", job.Namespace, job.Name)
return nil
}
job.Status = newStatus
job.Status.State.LastTransitionTime = metav1.Now()
jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
job.Status.Conditions = append(job.Status.Conditions, jobCondition)
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if e := cc.cache.Update(newJob); e != nil {
klog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, e)
return e
}
return nil
}
func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskIndex int, job *batch.Job) bool {
if job.Spec.Tasks[taskIndex].DependsOn == nil {
return true
}
dependsOn := *job.Spec.Tasks[taskIndex].DependsOn
if len(dependsOn.Name) > 1 && dependsOn.Iteration == batch.IterationAny {
// any ready to create task, return true
for _, task := range dependsOn.Name {
if cc.isDependsOnPodsReady(task, job) {
return true
}
}
// all not ready to skip create task, return false
return false
}
for _, dependsOnTask := range dependsOn.Name {
// any not ready to skip create task, return false
if !cc.isDependsOnPodsReady(dependsOnTask, job) {
return false
}
}
// all ready to create task, return true
return true
}
func (cc *jobcontroller) isDependsOnPodsReady(task string, job *batch.Job) bool {
dependsOnPods := jobhelpers.GetPodsNameUnderTask(task, job)
dependsOnTaskIndex := jobhelpers.GetTaskIndexUnderJob(task, job)
runningPodCount := 0
for _, podName := range dependsOnPods {
pod, err := cc.podLister.Pods(job.Namespace).Get(podName)
if err != nil {
// If pod is not found. There are 2 possibilities.
// 1. vcjob has been deleted. This function should return true.
// 2. pod is not created. This function should return false, continue waiting.
if apierrors.IsNotFound(err) {
_, errGetJob := cc.jobLister.Jobs(job.Namespace).Get(job.Name)
if errGetJob != nil {
return apierrors.IsNotFound(errGetJob)
}
}
klog.Errorf("Failed to get pod %v/%v %v", job.Namespace, podName, err)
continue
}
if pod.Status.Phase != v1.PodRunning && pod.Status.Phase != v1.PodSucceeded {
klog.V(5).Infof("Sequential state, pod %v/%v of depends on tasks is not running", pod.Namespace, pod.Name)
continue
}
allContainerReady := true
for _, containerStatus := range pod.Status.ContainerStatuses {
if !containerStatus.Ready {
allContainerReady = false
break
}
}
if allContainerReady {
runningPodCount++
}
}
dependsOnTaskMinReplicas := job.Spec.Tasks[dependsOnTaskIndex].MinAvailable
if dependsOnTaskMinReplicas != nil {
if runningPodCount < int(*dependsOnTaskMinReplicas) {
klog.V(5).Infof("In a depends on startup state, there are already %d pods running, which is less than the minimum number of runs", runningPodCount)
return false
}
}
return true
}
func (cc *jobcontroller) createJobIOIfNotExist(job *batch.Job) (*batch.Job, error) {
// If PVC does not exist, create them for Job.
var needUpdate bool
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
for index, volume := range job.Spec.Volumes {
vcName := volume.VolumeClaimName
if len(vcName) == 0 {
// NOTE(k82cn): Ensure never have duplicated generated names.
for {
vcName = jobhelpers.GenPVCName(job.Name)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return job, err
}
if exist {
continue
}
job.Spec.Volumes[index].VolumeClaimName = vcName
needUpdate = true
break
}
// TODO: check VolumeClaim must be set if VolumeClaimName is empty
if volume.VolumeClaim != nil {
if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil {
return job, err
}
}
} else {
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return job, err
}
if !exist {
return job, fmt.Errorf("pvc %s is not found, the job will be in the Pending state until the PVC is created", vcName)
}
}
job.Status.ControlledResources["volume-pvc-"+vcName] = vcName
}
if needUpdate {
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update Job %v/%v for volume claim name: %v ",
job.Namespace, job.Name, err)
return job, err
}
newJob.Status = job.Status
return newJob, err
}
return job, nil
}
func (cc *jobcontroller) checkPVCExist(job *batch.Job, pvc string) (bool, error) {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(pvc); err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
klog.V(3).Infof("Failed to get PVC %s for job <%s/%s>: %v",
pvc, job.Namespace, job.Name, err)
return false, err
}
return true, nil
}
func (cc *jobcontroller) createPVC(job *batch.Job, vcName string, volumeClaim *v1.PersistentVolumeClaimSpec) error {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: vcName,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: *volumeClaim,
}
klog.V(3).Infof("Try to create PVC: %v", pvc)
if _, e := cc.kubeClient.CoreV1().PersistentVolumeClaims(job.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{}); e != nil {
klog.V(3).Infof("Failed to create PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, e)
return e
}
return nil
}
func (cc *jobcontroller) createOrUpdatePodGroup(job *batch.Job) error {
// If PodGroup does not exist, create one for Job.
pg, err := cc.getPodGroupByJob(job)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
minTaskMember := map[string]int32{}
for _, task := range job.Spec.Tasks {
if task.MinAvailable != nil {
minTaskMember[task.Name] = *task.MinAvailable
} else {
minTaskMember[task.Name] = task.Replicas
}
}
pg := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
// add job.UID into its name when create new PodGroup
Name: cc.generateRelatedPodGroupName(job),
Annotations: job.Annotations,
Labels: job.Labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: scheduling.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
MinTaskMember: minTaskMember,
Queue: job.Spec.Queue,
MinResources: cc.calcPGMinResources(job),
PriorityClassName: job.Spec.PriorityClassName,
},
}
if job.Spec.NetworkTopology != nil {
nt := &scheduling.NetworkTopologySpec{
Mode: scheduling.NetworkTopologyMode(job.Spec.NetworkTopology.Mode),
}
if job.Spec.NetworkTopology.HighestTierAllowed != nil {
nt.HighestTierAllowed = job.Spec.NetworkTopology.HighestTierAllowed
}
pg.Spec.NetworkTopology = nt
}
if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Create(context.TODO(), pg, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.Errorf("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
}
return nil
}
podGroupToUpdate := pg.DeepCopy()
pgShouldUpdate := cc.shouldUpdateExistingPodGroup(podGroupToUpdate, job)
if !pgShouldUpdate {
return nil
}
_, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Update(context.TODO(), podGroupToUpdate, metav1.UpdateOptions{})
if err != nil {
klog.V(3).Infof("Failed to update PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
}
return err
}
func (cc *jobcontroller) shouldUpdateExistingPodGroup(pg *scheduling.PodGroup, job *batch.Job) bool {
pgShouldUpdate := false
if pg.Spec.PriorityClassName != job.Spec.PriorityClassName {
pg.Spec.PriorityClassName = job.Spec.PriorityClassName
pgShouldUpdate = true
}
minResources := cc.calcPGMinResources(job)
if pg.Spec.MinMember != job.Spec.MinAvailable || !equality.Semantic.DeepEqual(pg.Spec.MinResources, minResources) {
pg.Spec.MinMember = job.Spec.MinAvailable
pg.Spec.MinResources = minResources
pgShouldUpdate = true
}
if pg.Spec.MinTaskMember == nil {
pgShouldUpdate = true
pg.Spec.MinTaskMember = make(map[string]int32)
}
for _, task := range job.Spec.Tasks {
cnt := task.Replicas
if task.MinAvailable != nil {
cnt = *task.MinAvailable
}
if taskMember, ok := pg.Spec.MinTaskMember[task.Name]; !ok {
pgShouldUpdate = true
pg.Spec.MinTaskMember[task.Name] = cnt
} else {
if taskMember == cnt {
continue
}
pgShouldUpdate = true
pg.Spec.MinTaskMember[task.Name] = cnt
}
}
return pgShouldUpdate
}
func (cc *jobcontroller) deleteJobPod(jobName string, pod *v1.Pod) error {
err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete pod %s/%s for Job %s, err %#v",
pod.Namespace, pod.Name, jobName, err)
return fmt.Errorf("failed to delete pod %s, err %#v", pod.Name, err)
}
return nil
}
func (cc *jobcontroller) calcPGMinResources(job *batch.Job) *v1.ResourceList {
// sort task by priorityClasses
var tasksPriority TasksPriority
totalMinAvailable := int32(0)
for _, task := range job.Spec.Tasks {
tp := TaskPriority{0, task}
pc := task.Template.Spec.PriorityClassName
if pc != "" {
priorityClass, err := cc.pcLister.Get(pc)
if err != nil || priorityClass == nil {
klog.Warningf("Ignore task %s priority class %s: %v", task.Name, pc, err)
} else {
tp.priority = priorityClass.Value
}
}
tasksPriority = append(tasksPriority, tp)
if task.MinAvailable != nil { // actually, it can not be nil, because nil value will be patched in webhook
totalMinAvailable += *task.MinAvailable
} else {
totalMinAvailable += task.Replicas
}
}
// see docs https://github.com/volcano-sh/volcano/pull/2945
// 1. job.MinAvailable < sum(task.MinAvailable), regard podgroup's min resource as sum of the first minAvailable,
// according to https://github.com/volcano-sh/volcano/blob/c91eb07f2c300e4d5c826ff11a63b91781b3ac11/pkg/scheduler/api/job_info.go#L738-L740
if job.Spec.MinAvailable < totalMinAvailable {
minReq := tasksPriority.CalcFirstCountResources(job.Spec.MinAvailable)
return &minReq
}
// 2. job.MinAvailable >= sum(task.MinAvailable)
minReq := tasksPriority.CalcPGMinResources(job.Spec.MinAvailable)
return &minReq
}
func (cc *jobcontroller) initJobStatus(job *batch.Job) (*batch.Job, error) {
if job.Status.State.Phase != "" {
return job, nil
}
job.Status.State.Phase = batch.Pending
job.Status.State.LastTransitionTime = metav1.Now()
job.Status.MinAvailable = job.Spec.MinAvailable
jobCondition := newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
job.Status.Conditions = append(job.Status.Conditions, jobCondition)
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return nil, err
}
if err := cc.cache.Update(newJob); err != nil {
klog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, err)
return nil, err
}
return newJob, nil
}
func (cc *jobcontroller) recordPodGroupEvent(job *batch.Job, podGroup *scheduling.PodGroup) {
var latestCondition *scheduling.PodGroupCondition
// Get the latest condition by timestamp
for _, condition := range podGroup.Status.Conditions {
if condition.Status == v1.ConditionTrue {
if latestCondition == nil ||
condition.LastTransitionTime.Time.After(latestCondition.LastTransitionTime.Time) {
latestCondition = &condition
}
}
}
// If the latest condition is not scheduled, then a warning event is recorded
if latestCondition != nil && latestCondition.Type != scheduling.PodGroupScheduled {
cc.recorder.Eventf(job, v1.EventTypeWarning, string(batch.PodGroupPending),
fmt.Sprintf("PodGroup %s:%s %s, reason: %s", job.Namespace, job.Name,
strings.ToLower(string(latestCondition.Type)), latestCondition.Message))
}
}
func classifyAndAddUpPodBaseOnPhase(pod *v1.Pod, pending, running, succeeded, failed, unknown *int32) {
switch pod.Status.Phase {
case v1.PodPending:
atomic.AddInt32(pending, 1)
case v1.PodRunning:
atomic.AddInt32(running, 1)
case v1.PodSucceeded:
atomic.AddInt32(succeeded, 1)
case v1.PodFailed:
atomic.AddInt32(failed, 1)
default:
atomic.AddInt32(unknown, 1)
}
}
func calcPodStatus(pod *v1.Pod, taskStatusCount map[string]batch.TaskState) {
taskName, found := pod.Annotations[batch.TaskSpecKey]
if !found {
return
}
calMutex.Lock()
defer calMutex.Unlock()
if _, ok := taskStatusCount[taskName]; !ok {
taskStatusCount[taskName] = batch.TaskState{
Phase: make(map[v1.PodPhase]int32),
}
}
switch pod.Status.Phase {
case v1.PodPending:
taskStatusCount[taskName].Phase[v1.PodPending]++
case v1.PodRunning:
taskStatusCount[taskName].Phase[v1.PodRunning]++
case v1.PodSucceeded:
taskStatusCount[taskName].Phase[v1.PodSucceeded]++
case v1.PodFailed:
taskStatusCount[taskName].Phase[v1.PodFailed]++
default:
taskStatusCount[taskName].Phase[v1.PodUnknown]++
}
}
func isInitiated(job *batch.Job) bool {
if job.Status.State.Phase == "" || job.Status.State.Phase == batch.Pending {
return false
}
return true
}
func newCondition(status batch.JobPhase, lastTransitionTime *metav1.Time) batch.JobCondition {
return batch.JobCondition{
Status: status,
LastTransitionTime: lastTransitionTime,
}
}
/*
Copyright 2017 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"context"
"fmt"
"strconv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
bus "volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/apis/pkg/apis/helpers"
scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
jobcache "volcano.sh/volcano/pkg/controllers/cache"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/job/state"
)
func (cc *jobcontroller) addCommand(obj interface{}) {
cmd, ok := obj.(*bus.Command)
if !ok {
klog.Errorf("obj is not Command")
return
}
cc.commandQueue.Add(cmd)
}
func (cc *jobcontroller) addJob(obj interface{}) {
job, ok := obj.(*batch.Job)
if !ok {
klog.Errorf("obj is not Job")
return
}
req := apis.Request{
Namespace: job.Namespace,
JobName: job.Name,
Event: bus.OutOfSyncEvent,
}
// TODO(k82cn): if failed to add job, the cache should be refresh
if err := cc.cache.Add(job); err != nil {
klog.Errorf("Failed to add job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}
func (cc *jobcontroller) updateJob(oldObj, newObj interface{}) {
newJob, ok := newObj.(*batch.Job)
if !ok {
klog.Errorf("newObj is not Job")
return
}
oldJob, ok := oldObj.(*batch.Job)
if !ok {
klog.Errorf("oldJob is not Job")
return
}
// No need to update if ResourceVersion is not changed
if newJob.ResourceVersion == oldJob.ResourceVersion {
klog.V(6).Infof("No need to update because job is not modified.")
return
}
if err := cc.cache.Update(newJob); err != nil {
klog.Errorf("UpdateJob - Failed to update job <%s/%s>: %v in cache",
newJob.Namespace, newJob.Name, err)
}
// NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
// For Job status, it's used internally and always been updated via our controller.
if equality.Semantic.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase {
klog.V(6).Infof("Job update event is ignored since no update in 'Spec'.")
return
}
req := apis.Request{
Namespace: newJob.Namespace,
JobName: newJob.Name,
Event: bus.OutOfSyncEvent,
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}
func (cc *jobcontroller) deleteJob(obj interface{}) {
job, ok := obj.(*batch.Job)
if !ok {
// If we reached here it means the Job was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
job, ok = tombstone.Obj.(*batch.Job)
if !ok {
klog.Errorf("Tombstone contained object that is not a volcano Job: %#v", obj)
return
}
}
if err := cc.cache.Delete(job); err != nil {
klog.Errorf("Failed to delete job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}
// Delete job metrics
state.DeleteJobMetrics(fmt.Sprintf("%s/%s", job.Namespace, job.Name), job.Spec.Queue)
}
func (cc *jobcontroller) addPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
klog.Errorf("Failed to convert %v to v1.Pod", obj)
return
}
var jobUid types.UID
// Filter out pods that are not created from volcano job
if !isControlledBy(pod, helpers.JobKind) {
return
} else {
jobUid = metav1.GetControllerOf(pod).UID
}
jobName, found := pod.Annotations[batch.JobNameKey]
if !found {
klog.Infof("Failed to find jobName of Pod <%s/%s>, skipping",
pod.Namespace, pod.Name)
return
}
version, found := pod.Annotations[batch.JobVersion]
if !found {
klog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping",
pod.Namespace, pod.Name)
return
}
dVersion, err := strconv.ParseInt(version, 10, 32)
if err != nil {
klog.Infof("Failed to convert jobVersion of Pod <%s/%s> into number, skipping",
pod.Namespace, pod.Name)
return
}
if pod.DeletionTimestamp != nil {
cc.deletePod(pod)
return
}
req := apis.Request{
Namespace: pod.Namespace,
JobName: jobName,
JobUid: jobUid,
PodName: pod.Name,
PodUID: pod.UID,
Event: bus.PodPendingEvent,
JobVersion: int32(dVersion),
}
if err := cc.cache.AddPod(pod); err != nil {
klog.Errorf("Failed to add Pod <%s/%s>: %v to cache",
pod.Namespace, pod.Name, err)
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}
func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*v1.Pod)
if !ok {
klog.Errorf("Failed to convert %v to v1.Pod", oldObj)
return
}
newPod, ok := newObj.(*v1.Pod)
if !ok {
klog.Errorf("Failed to convert %v to v1.Pod", newObj)
return
}
var jobUid types.UID
// Filter out pods that are not created from volcano job
if !isControlledBy(newPod, helpers.JobKind) {
return
} else {
jobUid = metav1.GetControllerOf(newPod).UID
}
if newPod.ResourceVersion == oldPod.ResourceVersion {
return
}
if newPod.DeletionTimestamp != nil {
cc.deletePod(newObj)
return
}
taskName, found := newPod.Annotations[batch.TaskSpecKey]
if !found {
klog.Infof("Failed to find taskName of Pod <%s/%s>, skipping",
newPod.Namespace, newPod.Name)
return
}
jobName, found := newPod.Annotations[batch.JobNameKey]
if !found {
klog.Infof("Failed to find jobName of Pod <%s/%s>, skipping",
newPod.Namespace, newPod.Name)
return
}
version, found := newPod.Annotations[batch.JobVersion]
if !found {
klog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping",
newPod.Namespace, newPod.Name)
return
}
dVersion, err := strconv.ParseInt(version, 10, 32)
if err != nil {
klog.Infof("Failed to convert jobVersion of Pod into number <%s/%s>, skipping",
newPod.Namespace, newPod.Name)
return
}
if err := cc.cache.UpdatePod(newPod); err != nil {
klog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
newPod.Namespace, newPod.Name, err)
}
event := bus.OutOfSyncEvent
var exitCode int32
switch newPod.Status.Phase {
case v1.PodFailed:
if oldPod.Status.Phase != v1.PodFailed {
event = bus.PodFailedEvent
// TODO: currently only one container pod is supported by volcano
// Once multi containers pod is supported, update accordingly.
if len(newPod.Status.ContainerStatuses) > 0 && newPod.Status.ContainerStatuses[0].State.Terminated != nil {
exitCode = newPod.Status.ContainerStatuses[0].State.Terminated.ExitCode
}
}
case v1.PodSucceeded:
if oldPod.Status.Phase != v1.PodSucceeded &&
cc.cache.TaskCompleted(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) {
event = bus.TaskCompletedEvent
}
case v1.PodRunning:
if cc.cache.TaskFailed(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) {
event = bus.TaskFailedEvent
}
if oldPod.Status.Phase != v1.PodRunning {
event = bus.PodRunningEvent
}
case v1.PodPending:
if cc.cache.TaskFailed(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) {
event = bus.TaskFailedEvent
}
if oldPod.Status.Phase != v1.PodPending {
event = bus.PodPendingEvent
}
}
req := apis.Request{
Namespace: newPod.Namespace,
JobName: jobName,
JobUid: jobUid,
TaskName: taskName,
PodName: newPod.Name,
PodUID: newPod.UID,
Event: event,
ExitCode: exitCode,
JobVersion: int32(dVersion),
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}
func (cc *jobcontroller) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
// If we reached here it means the pod was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
klog.Errorf("Tombstone contained object that is not a Pod: %#v", obj)
return
}
}
var jobUid types.UID
// Filter out pods that are not created from volcano job
if !isControlledBy(pod, helpers.JobKind) {
return
} else {
jobUid = metav1.GetControllerOf(pod).UID
}
taskName, found := pod.Annotations[batch.TaskSpecKey]
if !found {
klog.Infof("Failed to find taskName of Pod <%s/%s>, skipping",
pod.Namespace, pod.Name)
return
}
jobName, found := pod.Annotations[batch.JobNameKey]
if !found {
klog.Infof("Failed to find jobName of Pod <%s/%s>, skipping",
pod.Namespace, pod.Name)
return
}
version, found := pod.Annotations[batch.JobVersion]
if !found {
klog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping",
pod.Namespace, pod.Name)
return
}
dVersion, err := strconv.ParseInt(version, 10, 32)
if err != nil {
klog.Infof("Failed to convert jobVersion of Pod <%s/%s> into number, skipping",
pod.Namespace, pod.Name)
return
}
req := apis.Request{
Namespace: pod.Namespace,
JobName: jobName,
JobUid: jobUid,
TaskName: taskName,
PodName: pod.Name,
PodUID: pod.UID,
Event: bus.PodEvictedEvent,
JobVersion: int32(dVersion),
}
if err := cc.cache.DeletePod(pod); err != nil {
klog.Errorf("Failed to delete Pod <%s/%s>: %v in cache",
pod.Namespace, pod.Name, err)
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}
func (cc *jobcontroller) recordJobEvent(namespace, name string, event batch.JobEvent, message string) {
job, err := cc.cache.Get(jobcache.JobKeyByName(namespace, name))
if err != nil {
klog.Warningf("Failed to find job in cache when reporting job event <%s/%s>: %v",
namespace, name, err)
return
}
cc.recorder.Event(job.Job, v1.EventTypeNormal, string(event), message)
}
func (cc *jobcontroller) handleCommands() {
for cc.processNextCommand() {
}
}
func (cc *jobcontroller) processNextCommand() bool {
obj, shutdown := cc.commandQueue.Get()
if shutdown {
return false
}
cmd := obj.(*bus.Command)
defer cc.commandQueue.Done(cmd)
if err := cc.vcClient.BusV1alpha1().Commands(cmd.Namespace).Delete(context.TODO(), cmd.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name)
cc.commandQueue.AddRateLimited(cmd)
}
return true
}
cc.recordJobEvent(cmd.Namespace, cmd.TargetObject.Name,
batch.CommandIssued,
fmt.Sprintf(
"Start to execute command %s, and clean it up to make sure executed not more than once.", cmd.Action))
req := apis.Request{
Namespace: cmd.Namespace,
JobName: cmd.TargetObject.Name,
Event: bus.CommandIssuedEvent,
Action: bus.Action(cmd.Action),
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
return true
}
func (cc *jobcontroller) updatePodGroup(oldObj, newObj interface{}) {
oldPG, ok := oldObj.(*scheduling.PodGroup)
if !ok {
klog.Errorf("Failed to convert %v to PodGroup", newObj)
return
}
newPG, ok := newObj.(*scheduling.PodGroup)
if !ok {
klog.Errorf("Failed to convert %v to PodGroup", newObj)
return
}
jobNameKey := newPG.Name
ors := newPG.OwnerReferences
for _, or := range ors {
if or.Kind == "Job" {
jobNameKey = or.Name
}
}
_, err := cc.cache.Get(jobcache.JobKeyByName(newPG.Namespace, jobNameKey))
if err != nil && newPG.Annotations != nil {
klog.Warningf(
"Failed to find job in cache by PodGroup(%s/%s), this may not be a PodGroup for volcano job.", newPG.Namespace, newPG.Name)
}
if newPG.Status.Phase != oldPG.Status.Phase {
req := apis.Request{
Namespace: newPG.Namespace,
JobName: jobNameKey,
}
switch newPG.Status.Phase {
case scheduling.PodGroupUnknown:
req.Event = bus.JobUnknownEvent
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}
}
// TODO(k82cn): add handler for PodGroup unschedulable event.
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/controllers/job/plugins"
pluginsinterface "volcano.sh/volcano/pkg/controllers/job/plugins/interface"
)
func (cc *jobcontroller) pluginOnPodCreate(job *batch.Job, pod *v1.Pod) error {
client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient}
for name, args := range job.Spec.Plugins {
pb, found := plugins.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
klog.Error(err)
return err
}
klog.Infof("Starting to execute plugin at <pluginOnPodCreate>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnPodCreate(pod, job); err != nil {
klog.Errorf("Failed to process on pod create plugin %s, err %v.", name, err)
return err
}
}
return nil
}
func (cc *jobcontroller) pluginOnJobAdd(job *batch.Job) error {
client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient}
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
for name, args := range job.Spec.Plugins {
pb, found := plugins.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
klog.Error(err)
return err
}
klog.Infof("Starting to execute plugin at <pluginOnJobAdd>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobAdd(job); err != nil {
klog.Errorf("Failed to process on job add plugin %s, err %v.", name, err)
return err
}
}
return nil
}
func (cc *jobcontroller) pluginOnJobDelete(job *batch.Job) error {
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient}
for name, args := range job.Spec.Plugins {
pb, found := plugins.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
klog.Error(err)
return err
}
klog.Infof("Starting to execute plugin at <pluginOnJobDelete>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobDelete(job); err != nil {
klog.Errorf("failed to process on job delete plugin %s, err %v.", name, err)
return err
}
}
return nil
}
func (cc *jobcontroller) pluginOnJobUpdate(job *batch.Job) error {
client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient}
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
for name, args := range job.Spec.Plugins {
pb, found := plugins.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
klog.Error(err)
return err
}
klog.Infof("Starting to execute plugin at <pluginOnJobUpdate>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobUpdate(job); err != nil {
klog.Errorf("Failed to process on job update plugin %s, err %v.", name, err)
return err
}
}
return nil
}
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"context"
"fmt"
"time"
"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
func newRateLimitingQueue() workqueue.TypedRateLimitingInterface[any] {
return workqueue.NewTypedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter[any](
workqueue.NewTypedItemExponentialFailureRateLimiter[any](5*time.Millisecond, 180*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
))
}
func (cc *jobcontroller) processResyncTask() {
obj, shutdown := cc.errTasks.Get()
if shutdown {
return
}
defer cc.errTasks.Done(obj)
// one task only resync 10 times
if cc.errTasks.NumRequeues(obj) > 10 {
cc.errTasks.Forget(obj)
return
}
task, ok := obj.(*v1.Pod)
if !ok {
klog.Errorf("failed to convert %v to *v1.Pod", obj)
return
}
if err := cc.syncTask(task); err != nil {
klog.Errorf("Failed to sync pod <%v/%v>, retry it, err %v", task.Namespace, task.Name, err)
cc.resyncTask(task)
}
}
func (cc *jobcontroller) syncTask(oldTask *v1.Pod) error {
newPod, err := cc.kubeClient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
if err := cc.cache.DeletePod(oldTask); err != nil {
klog.Errorf("failed to delete cache pod <%v/%v>, err %v.", oldTask.Namespace, oldTask.Name, err)
return err
}
klog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldTask.Namespace, oldTask.Name)
return nil
}
return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err)
}
return cc.cache.UpdatePod(newPod)
}
func (cc *jobcontroller) resyncTask(task *v1.Pod) {
cc.errTasks.AddRateLimited(task)
}
/*
Copyright 2017 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
"sort"
"strconv"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/klog/v2"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/apis/pkg/apis/helpers"
schedulingv2 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
jobcache "volcano.sh/volcano/pkg/controllers/cache"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/job/state"
"volcano.sh/volcano/pkg/controllers/util"
)
// MakePodName append podname,jobname,taskName and index and returns the string.
func MakePodName(jobName string, taskName string, index int) string {
return fmt.Sprintf(jobhelpers.PodNameFmt, jobName, taskName, index)
}
func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy batch.NumaPolicy, ix int, jobForwarding bool) *v1.Pod {
templateCopy := template.DeepCopy()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: jobhelpers.MakePodName(job.Name, template.Name, ix),
Namespace: job.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
Labels: templateCopy.Labels,
Annotations: templateCopy.Annotations,
},
Spec: templateCopy.Spec,
}
// If no scheduler name in Pod, use scheduler name from Job.
if len(pod.Spec.SchedulerName) == 0 {
pod.Spec.SchedulerName = job.Spec.SchedulerName
}
// If no priority class specified in pod template, use priority class specified in job
if len(pod.Spec.PriorityClassName) == 0 && len(job.Spec.PriorityClassName) != 0 {
pod.Spec.PriorityClassName = job.Spec.PriorityClassName
}
volumeMap := make(map[string]string)
for _, volume := range job.Spec.Volumes {
vcName := volume.VolumeClaimName
name := fmt.Sprintf("%s-%s", job.Name, jobhelpers.GenRandomStr(12))
if _, ok := volumeMap[vcName]; !ok {
volume := v1.Volume{
Name: name,
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: vcName,
},
},
}
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
volumeMap[vcName] = name
} else {
// duplicate volumes, should be prevented
continue
}
for i, c := range pod.Spec.Containers {
vm := v1.VolumeMount{
MountPath: volume.MountPath,
Name: name,
}
pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm)
}
}
tsKey := templateCopy.Name
if len(tsKey) == 0 {
tsKey = batch.DefaultTaskSpec
}
if len(pod.Annotations) == 0 {
pod.Annotations = make(map[string]string)
}
index := strconv.Itoa(ix)
pod.Annotations[batch.TaskIndex] = index
pod.Annotations[batch.TaskSpecKey] = tsKey
pgName := job.Name + "-" + string(job.UID)
pod.Annotations[schedulingv2.KubeGroupNameAnnotationKey] = pgName
pod.Annotations[batch.JobNameKey] = job.Name
pod.Annotations[batch.QueueNameKey] = job.Spec.Queue
pod.Annotations[batch.JobVersion] = fmt.Sprintf("%d", job.Status.Version)
pod.Annotations[batch.PodTemplateKey] = fmt.Sprintf("%s-%s", job.Name, template.Name)
pod.Annotations[batch.JobRetryCountKey] = strconv.Itoa(int(job.Status.RetryCount))
if topologyPolicy != "" {
pod.Annotations[schedulingv2.NumaPolicyKey] = string(topologyPolicy)
}
if len(job.Annotations) > 0 {
if value, found := job.Annotations[schedulingv2.PodPreemptable]; found {
pod.Annotations[schedulingv2.PodPreemptable] = value
}
if value, found := job.Annotations[schedulingv2.CooldownTime]; found {
pod.Annotations[schedulingv2.CooldownTime] = value
}
if value, found := job.Annotations[schedulingv2.RevocableZone]; found {
pod.Annotations[schedulingv2.RevocableZone] = value
}
if value, found := job.Annotations[schedulingv2.JDBMinAvailable]; found {
pod.Annotations[schedulingv2.JDBMinAvailable] = value
} else if value, found := job.Annotations[schedulingv2.JDBMaxUnavailable]; found {
pod.Annotations[schedulingv2.JDBMaxUnavailable] = value
}
}
if len(pod.Labels) == 0 {
pod.Labels = make(map[string]string)
}
// Set pod labels for Service.
pod.Labels[batch.TaskIndex] = index
pod.Labels[batch.JobNameKey] = job.Name
pod.Labels[batch.TaskSpecKey] = tsKey
pod.Labels[batch.JobNamespaceKey] = job.Namespace
pod.Labels[batch.QueueNameKey] = job.Spec.Queue
if len(job.Labels) > 0 {
if value, found := job.Labels[schedulingv2.PodPreemptable]; found {
pod.Labels[schedulingv2.PodPreemptable] = value
}
if value, found := job.Labels[schedulingv2.CooldownTime]; found {
pod.Labels[schedulingv2.CooldownTime] = value
}
}
if jobForwarding {
pod.Annotations[batch.JobForwardingKey] = "true"
pod.Labels[batch.JobForwardingKey] = "true"
}
return pod
}
func applyPolicies(job *batch.Job, req *apis.Request) (delayAct *delayAction) {
delayAct = &delayAction{
jobKey: jobcache.JobKeyByReq(req),
event: req.Event,
taskName: req.TaskName,
podName: req.PodName,
podUID: req.PodUID,
// default action is sync job
action: v1alpha1.SyncJobAction,
}
if len(req.Action) != 0 {
delayAct.action = req.Action
return
}
// If the event is an internal event, we do not need to perform any action
if isInternalEvent(req.Event) {
return
}
// Solve the scenario: When pod events accumulate and vcjobs with the same name are frequently created,
// it is easy for the pod to cause abnormal status of the newly created vcjob with the same name.
if len(req.JobUid) != 0 && job != nil && req.JobUid != job.UID {
klog.V(2).Infof("The req belongs to job(%s/%s) and job uid is %v, but the uid of job(%s/%s) is %v in cache, perform %v action",
req.Namespace, req.JobName, req.JobUid, job.Namespace, job.Name, job.UID, v1alpha1.SyncJobAction)
return
}
// For all the requests triggered from discarded job resources will perform sync action instead
if req.JobVersion < job.Status.Version {
klog.Infof("Request %s is outdated, will perform sync instead.", req)
return
}
// Overwrite Job level policies
if len(req.TaskName) != 0 {
// Parse task level policies
for _, task := range job.Spec.Tasks {
if task.Name == req.TaskName {
for _, policy := range task.Policies {
policyEvents := getEventlist(policy)
if len(policyEvents) > 0 && len(req.Event) > 0 {
if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) {
// Check if the event requires a timeout configuration, and whether a timeout policy is specified.
// If the event does not require a timeout (shouldConfigureTimeout returns false),
// or if a timeout policy is already set (policy.Timeout != nil),
// execute the corresponding delay action and set the delay time based on the policy's Timeout.Duration.
// If a timeout policy is specified, set the delay to the timeout duration.
if !shouldConfigureTimeout(req.Event) || policy.Timeout != nil {
delayAct.action = policy.Action
if policy.Timeout != nil {
delayAct.delay = policy.Timeout.Duration
}
return
}
}
}
// 0 is not an error code, is prevented in validation admission controller
if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
delayAct.action = policy.Action
if policy.Timeout != nil {
delayAct.delay = policy.Timeout.Duration
}
return
}
}
break
}
}
}
// Parse Job level policies
for _, policy := range job.Spec.Policies {
policyEvents := getEventlist(policy)
if len(policyEvents) > 0 && len(req.Event) > 0 {
if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) {
if !(shouldConfigureTimeout(req.Event) && policy.Timeout == nil) {
delayAct.action = policy.Action
if policy.Timeout != nil {
delayAct.delay = policy.Timeout.Duration
}
return
}
}
}
// 0 is not an error code, is prevented in validation admission controller
if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
delayAct.action = policy.Action
if policy.Timeout != nil {
delayAct.delay = policy.Timeout.Duration
}
return
}
}
return
}
func shouldConfigureTimeout(event v1alpha1.Event) bool {
return event == v1alpha1.PodPendingEvent
}
func getEventlist(policy batch.LifecyclePolicy) []v1alpha1.Event {
policyEventsList := policy.Events
if len(policy.Event) > 0 {
policyEventsList = append(policyEventsList, policy.Event)
}
return policyEventsList
}
func checkEventExist(policyEvents []v1alpha1.Event, reqEvent v1alpha1.Event) bool {
for _, event := range policyEvents {
if event == reqEvent {
return true
}
}
return false
}
// TaskPriority structure.
type TaskPriority struct {
priority int32
batch.TaskSpec
}
// TasksPriority is a slice of TaskPriority.
type TasksPriority []TaskPriority
func (p TasksPriority) Len() int { return len(p) }
func (p TasksPriority) Less(i, j int) bool {
return p[i].priority > p[j].priority
}
func (p TasksPriority) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func isControlledBy(obj metav1.Object, gvk schema.GroupVersionKind) bool {
controllerRef := metav1.GetControllerOf(obj)
if controllerRef == nil {
return false
}
if controllerRef.APIVersion == gvk.GroupVersion().String() && controllerRef.Kind == gvk.Kind {
return true
}
return false
}
// CalcFirstCountResources return the first count tasks resource, sorted by priority
func (p TasksPriority) CalcFirstCountResources(count int32) v1.ResourceList {
sort.Sort(p)
minReq := v1.ResourceList{}
for _, task := range p {
if count <= task.Replicas {
minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, count))
break
} else {
minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, task.Replicas))
count -= task.Replicas
}
}
return minReq
}
// CalcPGMinResources sums up all task's min available; if not enough, then fill up to jobMinAvailable via task's replicas
func (p TasksPriority) CalcPGMinResources(jobMinAvailable int32) v1.ResourceList {
sort.Sort(p)
minReq := v1.ResourceList{}
podCnt := int32(0)
// 1. first sum up those tasks whose MinAvailable is set
for _, task := range p {
if task.MinAvailable == nil { // actually, all task's min available is set by webhook
continue
}
validReplics := *task.MinAvailable
if left := jobMinAvailable - podCnt; left < validReplics {
validReplics = left
}
minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, validReplics))
podCnt += validReplics
if podCnt >= jobMinAvailable {
break
}
}
if podCnt >= jobMinAvailable {
return minReq
}
// 2. fill up the count of pod to jobMinAvailable with tasks whose replicas is not used up, higher priority first
leftCnt := jobMinAvailable - podCnt
for _, task := range p {
left := task.Replicas
if task.MinAvailable != nil {
if *task.MinAvailable == task.Replicas {
continue
} else {
left = task.Replicas - *task.MinAvailable
}
}
if leftCnt >= left {
minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, left))
leftCnt -= left
} else {
minReq = quotav1.Add(minReq, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, leftCnt))
leftCnt = 0
}
if leftCnt <= 0 {
break
}
}
return minReq
}
// isInternalEvent checks if the event is an internal event
func isInternalEvent(event v1alpha1.Event) bool {
switch event {
case v1alpha1.OutOfSyncEvent,
v1alpha1.CommandIssuedEvent,
v1alpha1.PodRunningEvent:
return true
default:
return false
}
}
// isInternalAction checks if the action is an internal action
func isInternalAction(action v1alpha1.Action) bool {
switch action {
case v1alpha1.SyncJobAction,
v1alpha1.EnqueueAction,
v1alpha1.SyncQueueAction,
v1alpha1.OpenQueueAction,
v1alpha1.CloseQueueAction:
return true
default:
return false
}
}
func GetStateAction(delayAct *delayAction) state.Action {
action := state.Action{Action: delayAct.action}
if delayAct.action == v1alpha1.RestartTaskAction {
action.Target = state.Target{TaskName: delayAct.taskName, Type: state.TargetTypeTask}
} else if delayAct.action == v1alpha1.RestartPodAction {
action.Target = state.Target{TaskName: delayAct.taskName, PodName: delayAct.podName, Type: state.TargetTypePod}
}
return action
}
type ActionType int
const (
JobAction ActionType = iota
TaskAction
PodAction
)
func GetActionType(action v1alpha1.Action) ActionType {
switch action {
case v1alpha1.AbortJobAction,
v1alpha1.RestartJobAction,
v1alpha1.TerminateJobAction,
v1alpha1.CompleteJobAction,
v1alpha1.ResumeJobAction:
return JobAction
case v1alpha1.RestartTaskAction:
return TaskAction
case v1alpha1.RestartPodAction:
return PodAction
}
return JobAction
}