/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
yurtctrlmgrconfig "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/apis/config"
)
// Config is the main context object for the controller manager.
type Config struct {
ComponentConfig yurtctrlmgrconfig.YurtManagerConfiguration
}
type completedConfig struct {
*Config
}
// CompletedConfig same as Config, just to swap private object.
type CompletedConfig struct {
// Embed a private pointer that cannot be instantiated outside of this package.
*completedConfig
}
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() *CompletedConfig {
cc := completedConfig{c}
return &CompletedConfig{&cc}
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package names
const (
CsrApproverController = "csr-approver-controller"
DaemonPodUpdaterController = "daemon-pod-updater-controller"
NodePoolController = "nodepool-controller"
PlatformAdminController = "platform-admin-controller"
ServiceTopologyEndpointsController = "service-topology-endpoints-controller"
ServiceTopologyEndpointSliceController = "service-topology-endpointslice-controller"
YurtAppSetController = "yurt-app-set-controller"
YurtAppDaemonController = "yurt-app-daemon-controller"
YurtAppOverriderController = "yurt-app-overrider-controller"
YurtStaticSetController = "yurt-static-set-controller"
YurtCoordinatorCertController = "yurt-coordinator-cert-controller"
DelegateLeaseController = "delegate-lease-controller"
PodBindingController = "pod-binding-controller"
GatewayPickupController = "gateway-pickup-controller"
GatewayInternalServiceController = "gateway-internal-service-controller"
GatewayPublicServiceController = "gateway-public-service-controller"
GatewayDNSController = "gateway-dns-controller"
NodeLifeCycleController = "node-life-cycle-controller"
NodeBucketController = "node-bucket-controller"
)
func YurtManagerControllerAliases() map[string]string {
// return a new reference to achieve immutability of the mapping
return map[string]string{
"csrapprover": CsrApproverController,
"daemonpodupdater": DaemonPodUpdaterController,
"nodepool": NodePoolController,
"platformadmin": PlatformAdminController,
"servicetopologyendpoints": ServiceTopologyEndpointsController,
"servicetopologyendpointslices": ServiceTopologyEndpointSliceController,
"yurtappset": YurtAppSetController,
"yurtappdaemon": YurtAppDaemonController,
"yurtstaticset": YurtStaticSetController,
"yurtappoverrider": YurtAppOverriderController,
"yurtcoordinatorcert": YurtCoordinatorCertController,
"delegatelease": DelegateLeaseController,
"podbinding": PodBindingController,
"gatewaypickup": GatewayPickupController,
"gatewayinternalservice": GatewayInternalServiceController,
"gatewaypublicservice": GatewayPublicServiceController,
"gatewaydns": GatewayDNSController,
"nodelifecycle": NodeLifeCycleController,
"nodebucket": NodeBucketController,
}
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
v1 "k8s.io/kubernetes/pkg/apis/core/v1"
utilpointer "k8s.io/utils/pointer"
)
// SetDefaultsNodePool set default values for NodePool.
func SetDefaultsNodePool(obj *NodePool) {
// example for set default value for NodePool
if obj.Annotations == nil {
obj.Annotations = make(map[string]string)
}
}
// SetDefaultsYurtAppSet set default values for YurtAppSet.
func SetDefaultsYurtAppSet(obj *YurtAppSet) {
if obj.Spec.RevisionHistoryLimit == nil {
obj.Spec.RevisionHistoryLimit = utilpointer.Int32Ptr(10)
}
if obj.Spec.WorkloadTemplate.StatefulSetTemplate != nil {
SetDefaultPodSpec(&obj.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.Template.Spec)
for i := range obj.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.VolumeClaimTemplates {
a := &obj.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.VolumeClaimTemplates[i]
v1.SetDefaults_PersistentVolumeClaim(a)
v1.SetDefaults_ResourceList(&a.Spec.Resources.Limits)
v1.SetDefaults_ResourceList(&a.Spec.Resources.Requests)
v1.SetDefaults_ResourceList(&a.Status.Capacity)
}
}
if obj.Spec.WorkloadTemplate.DeploymentTemplate != nil {
SetDefaultPodSpec(&obj.Spec.WorkloadTemplate.DeploymentTemplate.Spec.Template.Spec)
}
}
// SetDefaultPodSpec sets default pod spec
func SetDefaultPodSpec(in *corev1.PodSpec) {
v1.SetDefaults_PodSpec(in)
for i := range in.Volumes {
a := &in.Volumes[i]
v1.SetDefaults_Volume(a)
if a.VolumeSource.HostPath != nil {
v1.SetDefaults_HostPathVolumeSource(a.VolumeSource.HostPath)
}
if a.VolumeSource.Secret != nil {
v1.SetDefaults_SecretVolumeSource(a.VolumeSource.Secret)
}
if a.VolumeSource.ISCSI != nil {
v1.SetDefaults_ISCSIVolumeSource(a.VolumeSource.ISCSI)
}
if a.VolumeSource.RBD != nil {
v1.SetDefaults_RBDVolumeSource(a.VolumeSource.RBD)
}
if a.VolumeSource.DownwardAPI != nil {
v1.SetDefaults_DownwardAPIVolumeSource(a.VolumeSource.DownwardAPI)
for j := range a.VolumeSource.DownwardAPI.Items {
b := &a.VolumeSource.DownwardAPI.Items[j]
if b.FieldRef != nil {
v1.SetDefaults_ObjectFieldSelector(b.FieldRef)
}
}
}
if a.VolumeSource.ConfigMap != nil {
v1.SetDefaults_ConfigMapVolumeSource(a.VolumeSource.ConfigMap)
}
if a.VolumeSource.AzureDisk != nil {
v1.SetDefaults_AzureDiskVolumeSource(a.VolumeSource.AzureDisk)
}
if a.VolumeSource.Projected != nil {
v1.SetDefaults_ProjectedVolumeSource(a.VolumeSource.Projected)
for j := range a.VolumeSource.Projected.Sources {
b := &a.VolumeSource.Projected.Sources[j]
if b.DownwardAPI != nil {
for k := range b.DownwardAPI.Items {
c := &b.DownwardAPI.Items[k]
if c.FieldRef != nil {
v1.SetDefaults_ObjectFieldSelector(c.FieldRef)
}
}
}
if b.ServiceAccountToken != nil {
v1.SetDefaults_ServiceAccountTokenProjection(b.ServiceAccountToken)
}
}
}
if a.VolumeSource.ScaleIO != nil {
v1.SetDefaults_ScaleIOVolumeSource(a.VolumeSource.ScaleIO)
}
}
for i := range in.InitContainers {
a := &in.InitContainers[i]
v1.SetDefaults_Container(a)
for j := range a.Ports {
b := &a.Ports[j]
SetDefaults_ContainerPort(b)
}
for j := range a.Env {
b := &a.Env[j]
if b.ValueFrom != nil {
if b.ValueFrom.FieldRef != nil {
v1.SetDefaults_ObjectFieldSelector(b.ValueFrom.FieldRef)
}
}
}
v1.SetDefaults_ResourceList(&a.Resources.Limits)
v1.SetDefaults_ResourceList(&a.Resources.Requests)
if a.LivenessProbe != nil {
v1.SetDefaults_Probe(a.LivenessProbe)
if a.LivenessProbe.Handler.HTTPGet != nil {
v1.SetDefaults_HTTPGetAction(a.LivenessProbe.Handler.HTTPGet)
}
}
if a.ReadinessProbe != nil {
v1.SetDefaults_Probe(a.ReadinessProbe)
if a.ReadinessProbe.Handler.HTTPGet != nil {
v1.SetDefaults_HTTPGetAction(a.ReadinessProbe.Handler.HTTPGet)
}
}
if a.Lifecycle != nil {
if a.Lifecycle.PostStart != nil {
if a.Lifecycle.PostStart.HTTPGet != nil {
v1.SetDefaults_HTTPGetAction(a.Lifecycle.PostStart.HTTPGet)
}
}
if a.Lifecycle.PreStop != nil {
if a.Lifecycle.PreStop.HTTPGet != nil {
v1.SetDefaults_HTTPGetAction(a.Lifecycle.PreStop.HTTPGet)
}
}
}
}
for i := range in.Containers {
a := &in.Containers[i]
// For in-place update, we set default imagePullPolicy to Always
if a.ImagePullPolicy == "" {
a.ImagePullPolicy = corev1.PullAlways
}
v1.SetDefaults_Container(a)
for j := range a.Ports {
b := &a.Ports[j]
SetDefaults_ContainerPort(b)
}
for j := range a.Env {
b := &a.Env[j]
if b.ValueFrom != nil {
if b.ValueFrom.FieldRef != nil {
v1.SetDefaults_ObjectFieldSelector(b.ValueFrom.FieldRef)
}
}
}
v1.SetDefaults_ResourceList(&a.Resources.Limits)
v1.SetDefaults_ResourceList(&a.Resources.Requests)
if a.LivenessProbe != nil {
v1.SetDefaults_Probe(a.LivenessProbe)
if a.LivenessProbe.Handler.HTTPGet != nil {
v1.SetDefaults_HTTPGetAction(a.LivenessProbe.Handler.HTTPGet)
}
}
if a.ReadinessProbe != nil {
v1.SetDefaults_Probe(a.ReadinessProbe)
if a.ReadinessProbe.Handler.HTTPGet != nil {
v1.SetDefaults_HTTPGetAction(a.ReadinessProbe.Handler.HTTPGet)
}
}
if a.Lifecycle != nil {
if a.Lifecycle.PostStart != nil {
if a.Lifecycle.PostStart.HTTPGet != nil {
v1.SetDefaults_HTTPGetAction(a.Lifecycle.PostStart.HTTPGet)
}
}
if a.Lifecycle.PreStop != nil {
if a.Lifecycle.PreStop.HTTPGet != nil {
v1.SetDefaults_HTTPGetAction(a.Lifecycle.PreStop.HTTPGet)
}
}
}
}
}
// TODO fix copy from https://github.com/contiv/client-go/blob/v2.0.0-alpha.1/pkg/api/v1/defaults.go#L104
func SetDefaults_ContainerPort(obj *corev1.ContainerPort) {
if obj.Protocol == "" {
obj.Protocol = corev1.ProtocolTCP
}
}
// SetDefaultsYurtStaticSet sets default values for YurtStaticSet.
func SetDefaultsYurtStaticSet(obj *YurtStaticSet) {
// Set default upgrade strategy to "AdvancedRollingUpdate" with max-unavailable to "10%"
strategy := &obj.Spec.UpgradeStrategy
if strategy.Type == "" {
strategy.Type = AdvancedRollingUpdateUpgradeStrategyType
}
if strings.EqualFold(string(strategy.Type), string(AdvancedRollingUpdateUpgradeStrategyType)) && strategy.MaxUnavailable == nil {
v := intstr.FromString("10%")
strategy.MaxUnavailable = &v
}
// Set default RevisionHistoryLimit to 10
if obj.Spec.RevisionHistoryLimit == nil {
obj.Spec.RevisionHistoryLimit = new(int32)
*obj.Spec.RevisionHistoryLimit = 10
}
podSpec := &obj.Spec.Template.Spec
if podSpec != nil {
SetDefaultPodSpec(podSpec)
}
// use YurtStaticSet name and namespace to replace name and namespace in template metadata
obj.Spec.Template.Name = obj.Name
obj.Spec.Template.Namespace = obj.Namespace
}
// SetDefaultsYurtAppDaemon set default values for YurtAppDaemon.
func SetDefaultsYurtAppDaemon(obj *YurtAppDaemon) {
if obj.Spec.RevisionHistoryLimit == nil {
obj.Spec.RevisionHistoryLimit = utilpointer.Int32Ptr(10)
}
if obj.Spec.WorkloadTemplate.StatefulSetTemplate != nil {
SetDefaultPodSpec(&obj.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.Template.Spec)
for i := range obj.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.VolumeClaimTemplates {
a := &obj.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.VolumeClaimTemplates[i]
v1.SetDefaults_PersistentVolumeClaim(a)
v1.SetDefaults_ResourceList(&a.Spec.Resources.Limits)
v1.SetDefaults_ResourceList(&a.Spec.Resources.Requests)
v1.SetDefaults_ResourceList(&a.Status.Capacity)
}
}
if obj.Spec.WorkloadTemplate.DeploymentTemplate != nil {
SetDefaultPodSpec(&obj.Spec.WorkloadTemplate.DeploymentTemplate.Spec.Template.Spec)
}
}
// SetDefaultsNodeBucket set default values for NodeBucket.
func SetDefaultsNodeBucket(obj *NodeBucket) {
// example for set default value for NodeBucket
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
// Package v1alpha1 contains API Schema definitions for the apps v1alpha1API group
// +kubebuilder:object:generate=true
// +groupName=apps.openyurt.io
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/scheme"
)
var (
// GroupVersion is group version used to register these objects
GroupVersion = schema.GroupVersion{Group: "apps.openyurt.io", Version: "v1alpha1"}
SchemeGroupVersion = GroupVersion
// SchemeBuilder is used to add go types to the GroupVersionKind scheme
SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion}
// AddToScheme adds the types in this group-version to the given scheme.
AddToScheme = SchemeBuilder.AddToScheme
)
// Resource is required by pkg/client/listers/...
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Node represents a specified node in the nodepool
type Node struct {
// Name is the name of node
Name string `json:"name,omitempty"`
}
// +genclient
// +k8s:openapi-gen=true
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:scope=Cluster,path=nodebuckets,shortName=nb,categories=all
// +kubebuilder:printcolumn:name="NUM-NODES",type="integer",JSONPath=".numNodes",description="NumNodes represents the number of nodes in the NodeBucket."
// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp",description="CreationTimestamp is a timestamp representing the server time when this object was created. It is not guaranteed to be set in happens-before order across separate operations. Clients may not set this value. It is represented in RFC3339 form and is in UTC."
// NodeBucket is the Schema for the samples API
type NodeBucket struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
// NumNodes represents the number of nodes in the nodebucket
NumNodes int32 `json:"numNodes"`
// Nodes represents a subset nodes in the nodepool
Nodes []Node `json:"nodes"`
}
//+kubebuilder:object:root=true
// NodeBucketList contains a list of NodeBucket
type NodeBucketList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []NodeBucket `json:"items"`
}
func init() {
SchemeBuilder.Register(&NodeBucket{}, &NodeBucketList{})
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/conversion"
"github.com/openyurtio/openyurt/pkg/apis/apps/v1beta1"
)
func (src *NodePool) ConvertTo(dstRaw conversion.Hub) error {
dst := dstRaw.(*v1beta1.NodePool)
dst.ObjectMeta = src.ObjectMeta
dst.Spec.Type = v1beta1.NodePoolType(src.Spec.Type)
dst.Spec.Labels = src.Spec.Labels
dst.Spec.Annotations = src.Spec.Annotations
dst.Spec.Taints = src.Spec.Taints
dst.Status.ReadyNodeNum = src.Status.ReadyNodeNum
dst.Status.UnreadyNodeNum = src.Status.UnreadyNodeNum
dst.Status.Nodes = src.Status.Nodes
klog.V(4).Infof("convert from v1alpha1 to v1beta1 for nodepool %s", dst.Name)
return nil
}
func (dst *NodePool) ConvertFrom(srcRaw conversion.Hub) error {
src := srcRaw.(*v1beta1.NodePool)
dst.ObjectMeta = src.ObjectMeta
dst.Spec.Type = NodePoolType(src.Spec.Type)
dst.Spec.Labels = src.Spec.Labels
dst.Spec.Annotations = src.Spec.Annotations
dst.Spec.Taints = src.Spec.Taints
dst.Status.ReadyNodeNum = src.Status.ReadyNodeNum
dst.Status.UnreadyNodeNum = src.Status.UnreadyNodeNum
dst.Status.Nodes = src.Status.Nodes
klog.V(4).Infof("convert from v1beta1 to v1alpha1 for nodepool %s", dst.Name)
return nil
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type NodePoolType string
const (
Edge NodePoolType = "Edge"
Cloud NodePoolType = "Cloud"
)
// NodePoolSpec defines the desired state of NodePool
type NodePoolSpec struct {
// The type of the NodePool
// +optional
Type NodePoolType `json:"type,omitempty"`
// A label query over nodes to consider for adding to the pool
// +optional
Selector *metav1.LabelSelector `json:"selector,omitempty"`
// If specified, the Labels will be added to all nodes.
// NOTE: existing labels with samy keys on the nodes will be overwritten.
// +optional
Labels map[string]string `json:"labels,omitempty"`
// If specified, the Annotations will be added to all nodes.
// NOTE: existing labels with samy keys on the nodes will be overwritten.
// +optional
Annotations map[string]string `json:"annotations,omitempty"`
// If specified, the Taints will be added to all nodes.
// +optional
Taints []v1.Taint `json:"taints,omitempty"`
}
// NodePoolStatus defines the observed state of NodePool
type NodePoolStatus struct {
// Total number of ready nodes in the pool.
// +optional
ReadyNodeNum int32 `json:"readyNodeNum"`
// Total number of unready nodes in the pool.
// +optional
UnreadyNodeNum int32 `json:"unreadyNodeNum"`
// The list of nodes' names in the pool
// +optional
Nodes []string `json:"nodes,omitempty"`
}
// +genclient:nonNamespaced
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true
// +kubebuilder:resource:scope=Cluster,path=nodepools,shortName=np,categories=all
// +kubebuilder:subresource:status
// +kubebuilder:deprecatedversion:warning="apps.openyurt.io/v1alpha1 NodePool is deprecated in v1.0.0+; use apps.openyurt.io/v1beta1 NodePool"
// +kubebuilder:printcolumn:name="Type",type="string",JSONPath=".spec.type",description="The type of nodepool"
// +kubebuilder:printcolumn:name="ReadyNodes",type="integer",JSONPath=".status.readyNodeNum",description="The number of ready nodes in the pool"
// +kubebuilder:printcolumn:name="NotReadyNodes",type="integer",JSONPath=".status.unreadyNodeNum"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// NodePool is the Schema for the nodepools API
type NodePool struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec NodePoolSpec `json:"spec,omitempty"`
Status NodePoolStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// NodePoolList contains a list of NodePool
type NodePoolList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []NodePool `json:"items"`
}
func init() {
SchemeBuilder.Register(&NodePool{}, &NodePoolList{})
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// YurtAppDaemonConditionType indicates valid conditions type of a YurtAppDaemon.
type YurtAppDaemonConditionType string
const (
// WorkLoadProvisioned means all the expected workload are provisioned
WorkLoadProvisioned YurtAppDaemonConditionType = "WorkLoadProvisioned"
// WorkLoadUpdated means all the workload are updated.
WorkLoadUpdated YurtAppDaemonConditionType = "WorkLoadUpdated"
// WorkLoadFailure is added to a YurtAppSet when one of its workload has failure during its own reconciling.
WorkLoadFailure YurtAppDaemonConditionType = "WorkLoadFailure"
)
// YurtAppDaemonSpec defines the desired state of YurtAppDaemon
type YurtAppDaemonSpec struct {
// Selector is a label query over pods that should match the replica count.
// It must match the pod template's labels.
Selector *metav1.LabelSelector `json:"selector"`
// WorkloadTemplate describes the pool that will be created.
// +optional
WorkloadTemplate WorkloadTemplate `json:"workloadTemplate,omitempty"`
// NodePoolSelector is a label query over nodepool that should match the replica count.
// It must match the nodepool's labels.
NodePoolSelector *metav1.LabelSelector `json:"nodepoolSelector"`
// Indicates the number of histories to be conserved.
// If unspecified, defaults to 10.
// +optional
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`
}
// YurtAppDaemonStatus defines the observed state of YurtAppDaemon
type YurtAppDaemonStatus struct {
// ObservedGeneration is the most recent generation observed for this YurtAppDaemon. It corresponds to the
// YurtAppDaemon's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
// Count of hash collisions for the YurtAppDaemon. The YurtAppDaemon controller
// uses this field as a collision avoidance mechanism when it needs to
// create the name for the newest ControllerRevision.
// +optional
CollisionCount *int32 `json:"collisionCount,omitempty"`
// CurrentRevision, if not empty, indicates the current version of the YurtAppDaemon.
CurrentRevision string `json:"currentRevision"`
// Represents the latest available observations of a YurtAppDaemon's current state.
// +optional
Conditions []YurtAppDaemonCondition `json:"conditions,omitempty"`
OverriderRef string `json:"overriderRef,omitempty"`
// Records the topology detailed information of each workload.
// +optional
WorkloadSummaries []WorkloadSummary `json:"workloadSummary,omitempty"`
// TemplateType indicates the type of PoolTemplate
TemplateType TemplateType `json:"templateType"`
// NodePools indicates the list of node pools selected by YurtAppDaemon
NodePools []string `json:"nodepools,omitempty"`
}
// YurtAppDaemonCondition describes current state of a YurtAppDaemon.
type YurtAppDaemonCondition struct {
// Type of in place set condition.
Type YurtAppDaemonConditionType `json:"type,omitempty"`
// Status of the condition, one of True, False, Unknown.
Status corev1.ConditionStatus `json:"status,omitempty"`
// Last time the condition transitioned from one status to another.
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// The reason for the condition's last transition.
Reason string `json:"reason,omitempty"`
// A human readable message indicating details about the transition.
Message string `json:"message,omitempty"`
}
// +genclient
// +k8s:openapi-gen=true
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:scope=Namespaced,path=yurtappdaemons,shortName=yad,categories=all
// +kubebuilder:printcolumn:name="WorkloadTemplate",type="string",JSONPath=".status.templateType",description="The WorkloadTemplate Type."
// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp",description="CreationTimestamp is a timestamp representing the server time when this object was created. It is not guaranteed to be set in happens-before order across separate operations. Clients may not set this value. It is represented in RFC3339 form and is in UTC."
// +kubebuilder:printcolumn:name="OverriderRef",type="string",JSONPath=".status.overriderRef",description="The name of overrider bound to this yurtappdaemon"
// YurtAppDaemon is the Schema for the samples API
type YurtAppDaemon struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec YurtAppDaemonSpec `json:"spec,omitempty"`
Status YurtAppDaemonStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// YurtAppDaemonList contains a list of YurtAppDaemon
type YurtAppDaemonList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []YurtAppDaemon `json:"items"`
}
func init() {
SchemeBuilder.Register(&YurtAppDaemon{}, &YurtAppDaemonList{})
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// ImageItem specifies the corresponding container and the claimed image
type ImageItem struct {
// ContainerName represents name of the container
// in which the Image will be replaced
ContainerName string `json:"containerName"`
// ImageClaim represents the claimed image name
//which is injected into the container above
ImageClaim string `json:"imageClaim"`
}
// Item represents configuration to be injected.
// Only one of its members may be specified.
type Item struct {
// +optional
Image *ImageItem `json:"image,omitempty"`
// +optional
Replicas *int32 `json:"replicas,omitempty"`
}
type Operation string
const (
ADD Operation = "add" // json patch
REMOVE Operation = "remove" // json patch
REPLACE Operation = "replace" // json patch
)
type Patch struct {
// Path represents the path in the json patch
Path string `json:"path"`
// Operation represents the operation
// +kubebuilder:validation:Enum=add;remove;replace
Operation Operation `json:"operation"`
// Indicates the value of json patch
// +optional
Value apiextensionsv1.JSON `json:"value,omitempty"`
}
// Describe detailed multi-region configuration of the subject
// Entry describe a set of nodepools and their shared or identical configurations
type Entry struct {
Pools []string `json:"pools"`
// +optional
Items []Item `json:"items,omitempty"`
// Convert Patch struct into json patch operation
// +optional
Patches []Patch `json:"patches,omitempty"`
}
// Describe the object Entries belongs
type Subject struct {
metav1.TypeMeta `json:",inline"`
// Name is the name of YurtAppSet or YurtAppDaemon
Name string `json:"name"`
}
// +genclient
// +kubebuilder:object:root=true
// +kubebuilder:resource:shortName=yao
// +kubebuilder:printcolumn:name="Subject",type="string",JSONPath=".subject.kind",description="The subject kind of this overrider."
// +kubebuilder:printcolumn:name="Name",type="string",JSONPath=".subject.name",description="The subject name of this overrider."
// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp",description="CreationTimestamp is a timestamp representing the server time when this object was created. It is not guaranteed to be set in happens-before order across separate operations. Clients may not set this value. It is represented in RFC3339 form and is in UTC."
type YurtAppOverrider struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Subject Subject `json:"subject"`
Entries []Entry `json:"entries"`
}
// YurtAppOverriderList contains a list of YurtAppOverrider
// +kubebuilder:object:root=true
type YurtAppOverriderList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []YurtAppOverrider `json:"items"`
}
func init() {
SchemeBuilder.Register(&YurtAppOverrider{}, &YurtAppOverriderList{})
}
/*
Copyright 2021 The OpenYurt Authors.
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@CHANGELOG
OpenYurt Authors:
change UnitedDeployment API Definition
*/
package v1alpha1
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
type TemplateType string
const (
StatefulSetTemplateType TemplateType = "StatefulSet"
DeploymentTemplateType TemplateType = "Deployment"
)
// YurtAppSetConditionType indicates valid conditions type of a YurtAppSet.
type YurtAppSetConditionType string
const (
// PoolProvisioned means all the expected pools are provisioned and unexpected pools are deleted.
PoolProvisioned YurtAppSetConditionType = "PoolProvisioned"
// PoolUpdated means all the pools are updated.
PoolUpdated YurtAppSetConditionType = "PoolUpdated"
// PoolFailure is added to a YurtAppSet when one of its pools has failure during its own reconciling.
PoolFailure YurtAppSetConditionType = "PoolFailure"
)
// YurtAppSetSpec defines the desired state of YurtAppSet.
type YurtAppSetSpec struct {
// Selector is a label query over pods that should match the replica count.
// It must match the pod template's labels.
Selector *metav1.LabelSelector `json:"selector"`
// WorkloadTemplate describes the pool that will be created.
// +optional
WorkloadTemplate WorkloadTemplate `json:"workloadTemplate,omitempty"`
// Topology describes the pods distribution detail between each of pools.
// +optional
Topology Topology `json:"topology,omitempty"`
// Indicates the number of histories to be conserved.
// If unspecified, defaults to 10.
// +optional
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`
}
// WorkloadTemplate defines the pool template under the YurtAppSet.
// YurtAppSet will provision every pool based on one workload templates in WorkloadTemplate.
// WorkloadTemplate now support statefulset and deployment
// Only one of its members may be specified.
type WorkloadTemplate struct {
// StatefulSet template
// +optional
StatefulSetTemplate *StatefulSetTemplateSpec `json:"statefulSetTemplate,omitempty"`
// Deployment template
// +optional
DeploymentTemplate *DeploymentTemplateSpec `json:"deploymentTemplate,omitempty"`
}
// StatefulSetTemplateSpec defines the pool template of StatefulSet.
type StatefulSetTemplateSpec struct {
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Schemaless
metav1.ObjectMeta `json:"metadata,omitempty"`
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Schemaless
Spec appsv1.StatefulSetSpec `json:"spec"`
}
// DeploymentTemplateSpec defines the pool template of Deployment.
type DeploymentTemplateSpec struct {
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Schemaless
metav1.ObjectMeta `json:"metadata,omitempty"`
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Schemaless
Spec appsv1.DeploymentSpec `json:"spec"`
}
// Topology defines the spread detail of each pool under YurtAppSet.
// A YurtAppSet manages multiple homogeneous workloads which are called pool.
// Each of pools under the YurtAppSet is described in Topology.
type Topology struct {
// Contains the details of each pool. Each element in this array represents one pool
// which will be provisioned and managed by YurtAppSet.
// +optional
Pools []Pool `json:"pools,omitempty"`
}
// Pool defines the detail of a pool.
type Pool struct {
// Indicates pool name as a DNS_LABEL, which will be used to generate
// pool workload name prefix in the format '<deployment-name>-<pool-name>-'.
// Name should be unique between all of the pools under one YurtAppSet.
// Name is NodePool Name
Name string `json:"name"`
// Indicates the node selector to form the pool. Depending on the node selector,
// pods provisioned could be distributed across multiple groups of nodes.
// A pool's nodeSelectorTerm is not allowed to be updated.
// +optional
NodeSelectorTerm corev1.NodeSelectorTerm `json:"nodeSelectorTerm,omitempty"`
// Indicates the tolerations the pods under this pool have.
// A pool's tolerations is not allowed to be updated.
// +optional
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
// Indicates the number of the pod to be created under this pool.
// +required
Replicas *int32 `json:"replicas,omitempty"`
// Indicates the patch for the templateSpec
// Now support strategic merge path :https://kubernetes.io/docs/tasks/manage-kubernetes-objects/update-api-object-kubectl-patch/#notes-on-the-strategic-merge-patch
// Patch takes precedence over Replicas fields
// If the Patch also modifies the Replicas, use the Replicas value in the Patch
// +optional
// +kubebuilder:pruning:PreserveUnknownFields
Patch *runtime.RawExtension `json:"patch,omitempty"`
}
// YurtAppSetStatus defines the observed state of YurtAppSet.
type YurtAppSetStatus struct {
// ObservedGeneration is the most recent generation observed for this YurtAppSet. It corresponds to the
// YurtAppSet's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
// Count of hash collisions for the YurtAppSet. The YurtAppSet controller
// uses this field as a collision avoidance mechanism when it needs to
// create the name for the newest ControllerRevision.
// +optional
CollisionCount *int32 `json:"collisionCount,omitempty"`
// CurrentRevision, if not empty, indicates the current version of the YurtAppSet.
CurrentRevision string `json:"currentRevision"`
// Represents the latest available observations of a YurtAppSet's current state.
// +optional
Conditions []YurtAppSetCondition `json:"conditions,omitempty"`
// Records the topology detailed information of each workload.
// +optional
WorkloadSummaries []WorkloadSummary `json:"workloadSummary,omitempty"`
OverriderRef string `json:"overriderRef,omitempty"`
// Records the topology detail information of the replicas of each pool.
// +optional
PoolReplicas map[string]int32 `json:"poolReplicas,omitempty"`
// The number of ready replicas.
// +optional
ReadyReplicas int32 `json:"readyReplicas"`
// Replicas is the most recently observed number of replicas.
Replicas int32 `json:"replicas"`
// TemplateType indicates the type of PoolTemplate
TemplateType TemplateType `json:"templateType"`
}
type WorkloadSummary struct {
AvailableCondition corev1.ConditionStatus `json:"availableCondition"`
Replicas int32 `json:"replicas"`
ReadyReplicas int32 `json:"readyReplicas"`
WorkloadName string `json:"workloadName"`
}
// YurtAppSetCondition describes current state of a YurtAppSet.
type YurtAppSetCondition struct {
// Type of in place set condition.
Type YurtAppSetConditionType `json:"type,omitempty"`
// Status of the condition, one of True, False, Unknown.
Status corev1.ConditionStatus `json:"status,omitempty"`
// Last time the condition transitioned from one status to another.
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// The reason for the condition's last transition.
Reason string `json:"reason,omitempty"`
// A human readable message indicating details about the transition.
Message string `json:"message,omitempty"`
}
// +genclient
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=yas
// +kubebuilder:printcolumn:name="READY",type="integer",JSONPath=".status.readyReplicas",description="The number of pods ready."
// +kubebuilder:printcolumn:name="WorkloadTemplate",type="string",JSONPath=".status.templateType",description="The WorkloadTemplate Type."
// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp",description="CreationTimestamp is a timestamp representing the server time when this object was created. It is not guaranteed to be set in happens-before order across separate operations. Clients may not set this value. It is represented in RFC3339 form and is in UTC."
// +kubebuilder:printcolumn:name="OverriderRef",type="string",JSONPath=".status.overriderRef",description="The name of overrider bound to this yurtappset"
// YurtAppSet is the Schema for the yurtAppSets API
type YurtAppSet struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec YurtAppSetSpec `json:"spec,omitempty"`
Status YurtAppSetStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// YurtAppSetList contains a list of YurtAppSet
type YurtAppSetList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []YurtAppSet `json:"items"`
}
func init() {
SchemeBuilder.Register(&YurtAppSet{}, &YurtAppSetList{})
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
// YurtStaticSetUpgradeStrategy defines a strategy to upgrade static pods.
type YurtStaticSetUpgradeStrategy struct {
// Type of YurtStaticSet upgrade. Can be "AdvancedRollingUpdate" or "OTA".
Type YurtStaticSetUpgradeStrategyType `json:"type,omitempty"`
// AdvancedRollingUpdate upgrade config params. Present only if type = "AdvancedRollingUpdate".
//+optional
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
}
// YurtStaticSetUpgradeStrategyType is a strategy according to which static pods gets upgraded.
type YurtStaticSetUpgradeStrategyType string
const (
AdvancedRollingUpdateUpgradeStrategyType YurtStaticSetUpgradeStrategyType = "AdvancedRollingUpdate"
OTAUpgradeStrategyType YurtStaticSetUpgradeStrategyType = "OTA"
)
// YurtStaticSetSpec defines the desired state of YurtStaticSet
type YurtStaticSetSpec struct {
// StaticPodManifest indicates the file name of static pod manifest.
// The corresponding manifest file name is `StaticPodManifest.yaml`.
StaticPodManifest string `json:"staticPodManifest,omitempty"`
// An upgrade strategy to replace existing static pods with new ones.
UpgradeStrategy YurtStaticSetUpgradeStrategy `json:"upgradeStrategy,omitempty"`
// The number of old history to retain to allow rollback.
// Defaults to 10.
// +optional
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`
// An object that describes the desired spec of static pod.
// +optional
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Schemaless
Template corev1.PodTemplateSpec `json:"template,omitempty"`
}
// YurtStaticSetStatus defines the observed state of YurtStaticSet
type YurtStaticSetStatus struct {
// The total number of nodes that are running the static pod.
TotalNumber int32 `json:"totalNumber"`
// The number of ready static pods.
ReadyNumber int32 `json:"readyNumber"`
// The number of nodes that are running updated static pod.
UpgradedNumber int32 `json:"upgradedNumber"`
// The most recent generation observed by the static pod controller.
// +optional
ObservedGeneration int64 `json:"observedGeneration"`
}
// +genclient
// +k8s:openapi-gen=true
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=yss
// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp",description="CreationTimestamp is a timestamp representing the server time when this object was created. It is not guaranteed to be set in happens-before order across separate operations. Clients may not set this value. It is represented in RFC3339 form and is in UTC."
//+kubebuilder:printcolumn:name="Total",type="integer",JSONPath=".status.totalNumber",description="The total number of static pods"
//+kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyNumber",description="The number of ready static pods"
//+kubebuilder:printcolumn:name="Upgraded",type="integer",JSONPath=".status.upgradedNumber",description="The number of static pods that have been upgraded"
// YurtStaticSet is the Schema for the yurtstaticsets API
type YurtStaticSet struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec YurtStaticSetSpec `json:"spec,omitempty"`
Status YurtStaticSetStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// YurtStaticSetList contains a list of YurtStaticSet
type YurtStaticSetList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []YurtStaticSet `json:"items"`
}
func init() {
SchemeBuilder.Register(&YurtStaticSet{}, &YurtStaticSetList{})
}
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by controller-gen. DO NOT EDIT.
package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DeploymentTemplateSpec) DeepCopyInto(out *DeploymentTemplateSpec) {
*out = *in
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentTemplateSpec.
func (in *DeploymentTemplateSpec) DeepCopy() *DeploymentTemplateSpec {
if in == nil {
return nil
}
out := new(DeploymentTemplateSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Entry) DeepCopyInto(out *Entry) {
*out = *in
if in.Pools != nil {
in, out := &in.Pools, &out.Pools
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]Item, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Patches != nil {
in, out := &in.Patches, &out.Patches
*out = make([]Patch, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Entry.
func (in *Entry) DeepCopy() *Entry {
if in == nil {
return nil
}
out := new(Entry)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ImageItem) DeepCopyInto(out *ImageItem) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ImageItem.
func (in *ImageItem) DeepCopy() *ImageItem {
if in == nil {
return nil
}
out := new(ImageItem)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Item) DeepCopyInto(out *Item) {
*out = *in
if in.Image != nil {
in, out := &in.Image, &out.Image
*out = new(ImageItem)
**out = **in
}
if in.Replicas != nil {
in, out := &in.Replicas, &out.Replicas
*out = new(int32)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Item.
func (in *Item) DeepCopy() *Item {
if in == nil {
return nil
}
out := new(Item)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Node) DeepCopyInto(out *Node) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Node.
func (in *Node) DeepCopy() *Node {
if in == nil {
return nil
}
out := new(Node)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodeBucket) DeepCopyInto(out *NodeBucket) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
if in.Nodes != nil {
in, out := &in.Nodes, &out.Nodes
*out = make([]Node, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeBucket.
func (in *NodeBucket) DeepCopy() *NodeBucket {
if in == nil {
return nil
}
out := new(NodeBucket)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *NodeBucket) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodeBucketList) DeepCopyInto(out *NodeBucketList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]NodeBucket, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeBucketList.
func (in *NodeBucketList) DeepCopy() *NodeBucketList {
if in == nil {
return nil
}
out := new(NodeBucketList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *NodeBucketList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodePool) DeepCopyInto(out *NodePool) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodePool.
func (in *NodePool) DeepCopy() *NodePool {
if in == nil {
return nil
}
out := new(NodePool)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *NodePool) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodePoolList) DeepCopyInto(out *NodePoolList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]NodePool, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodePoolList.
func (in *NodePoolList) DeepCopy() *NodePoolList {
if in == nil {
return nil
}
out := new(NodePoolList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *NodePoolList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodePoolSpec) DeepCopyInto(out *NodePoolSpec) {
*out = *in
if in.Selector != nil {
in, out := &in.Selector, &out.Selector
*out = new(v1.LabelSelector)
(*in).DeepCopyInto(*out)
}
if in.Labels != nil {
in, out := &in.Labels, &out.Labels
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.Annotations != nil {
in, out := &in.Annotations, &out.Annotations
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.Taints != nil {
in, out := &in.Taints, &out.Taints
*out = make([]corev1.Taint, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodePoolSpec.
func (in *NodePoolSpec) DeepCopy() *NodePoolSpec {
if in == nil {
return nil
}
out := new(NodePoolSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodePoolStatus) DeepCopyInto(out *NodePoolStatus) {
*out = *in
if in.Nodes != nil {
in, out := &in.Nodes, &out.Nodes
*out = make([]string, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodePoolStatus.
func (in *NodePoolStatus) DeepCopy() *NodePoolStatus {
if in == nil {
return nil
}
out := new(NodePoolStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Patch) DeepCopyInto(out *Patch) {
*out = *in
in.Value.DeepCopyInto(&out.Value)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Patch.
func (in *Patch) DeepCopy() *Patch {
if in == nil {
return nil
}
out := new(Patch)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Pool) DeepCopyInto(out *Pool) {
*out = *in
in.NodeSelectorTerm.DeepCopyInto(&out.NodeSelectorTerm)
if in.Tolerations != nil {
in, out := &in.Tolerations, &out.Tolerations
*out = make([]corev1.Toleration, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Replicas != nil {
in, out := &in.Replicas, &out.Replicas
*out = new(int32)
**out = **in
}
if in.Patch != nil {
in, out := &in.Patch, &out.Patch
*out = new(runtime.RawExtension)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Pool.
func (in *Pool) DeepCopy() *Pool {
if in == nil {
return nil
}
out := new(Pool)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *StatefulSetTemplateSpec) DeepCopyInto(out *StatefulSetTemplateSpec) {
*out = *in
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StatefulSetTemplateSpec.
func (in *StatefulSetTemplateSpec) DeepCopy() *StatefulSetTemplateSpec {
if in == nil {
return nil
}
out := new(StatefulSetTemplateSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Subject) DeepCopyInto(out *Subject) {
*out = *in
out.TypeMeta = in.TypeMeta
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Subject.
func (in *Subject) DeepCopy() *Subject {
if in == nil {
return nil
}
out := new(Subject)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Topology) DeepCopyInto(out *Topology) {
*out = *in
if in.Pools != nil {
in, out := &in.Pools, &out.Pools
*out = make([]Pool, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Topology.
func (in *Topology) DeepCopy() *Topology {
if in == nil {
return nil
}
out := new(Topology)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WorkloadSummary) DeepCopyInto(out *WorkloadSummary) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkloadSummary.
func (in *WorkloadSummary) DeepCopy() *WorkloadSummary {
if in == nil {
return nil
}
out := new(WorkloadSummary)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WorkloadTemplate) DeepCopyInto(out *WorkloadTemplate) {
*out = *in
if in.StatefulSetTemplate != nil {
in, out := &in.StatefulSetTemplate, &out.StatefulSetTemplate
*out = new(StatefulSetTemplateSpec)
(*in).DeepCopyInto(*out)
}
if in.DeploymentTemplate != nil {
in, out := &in.DeploymentTemplate, &out.DeploymentTemplate
*out = new(DeploymentTemplateSpec)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkloadTemplate.
func (in *WorkloadTemplate) DeepCopy() *WorkloadTemplate {
if in == nil {
return nil
}
out := new(WorkloadTemplate)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppDaemon) DeepCopyInto(out *YurtAppDaemon) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppDaemon.
func (in *YurtAppDaemon) DeepCopy() *YurtAppDaemon {
if in == nil {
return nil
}
out := new(YurtAppDaemon)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *YurtAppDaemon) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppDaemonCondition) DeepCopyInto(out *YurtAppDaemonCondition) {
*out = *in
in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppDaemonCondition.
func (in *YurtAppDaemonCondition) DeepCopy() *YurtAppDaemonCondition {
if in == nil {
return nil
}
out := new(YurtAppDaemonCondition)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppDaemonList) DeepCopyInto(out *YurtAppDaemonList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]YurtAppDaemon, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppDaemonList.
func (in *YurtAppDaemonList) DeepCopy() *YurtAppDaemonList {
if in == nil {
return nil
}
out := new(YurtAppDaemonList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *YurtAppDaemonList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppDaemonSpec) DeepCopyInto(out *YurtAppDaemonSpec) {
*out = *in
if in.Selector != nil {
in, out := &in.Selector, &out.Selector
*out = new(v1.LabelSelector)
(*in).DeepCopyInto(*out)
}
in.WorkloadTemplate.DeepCopyInto(&out.WorkloadTemplate)
if in.NodePoolSelector != nil {
in, out := &in.NodePoolSelector, &out.NodePoolSelector
*out = new(v1.LabelSelector)
(*in).DeepCopyInto(*out)
}
if in.RevisionHistoryLimit != nil {
in, out := &in.RevisionHistoryLimit, &out.RevisionHistoryLimit
*out = new(int32)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppDaemonSpec.
func (in *YurtAppDaemonSpec) DeepCopy() *YurtAppDaemonSpec {
if in == nil {
return nil
}
out := new(YurtAppDaemonSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppDaemonStatus) DeepCopyInto(out *YurtAppDaemonStatus) {
*out = *in
if in.CollisionCount != nil {
in, out := &in.CollisionCount, &out.CollisionCount
*out = new(int32)
**out = **in
}
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]YurtAppDaemonCondition, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.WorkloadSummaries != nil {
in, out := &in.WorkloadSummaries, &out.WorkloadSummaries
*out = make([]WorkloadSummary, len(*in))
copy(*out, *in)
}
if in.NodePools != nil {
in, out := &in.NodePools, &out.NodePools
*out = make([]string, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppDaemonStatus.
func (in *YurtAppDaemonStatus) DeepCopy() *YurtAppDaemonStatus {
if in == nil {
return nil
}
out := new(YurtAppDaemonStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppOverrider) DeepCopyInto(out *YurtAppOverrider) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Subject = in.Subject
if in.Entries != nil {
in, out := &in.Entries, &out.Entries
*out = make([]Entry, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppOverrider.
func (in *YurtAppOverrider) DeepCopy() *YurtAppOverrider {
if in == nil {
return nil
}
out := new(YurtAppOverrider)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *YurtAppOverrider) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppOverriderList) DeepCopyInto(out *YurtAppOverriderList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]YurtAppOverrider, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppOverriderList.
func (in *YurtAppOverriderList) DeepCopy() *YurtAppOverriderList {
if in == nil {
return nil
}
out := new(YurtAppOverriderList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *YurtAppOverriderList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppSet) DeepCopyInto(out *YurtAppSet) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppSet.
func (in *YurtAppSet) DeepCopy() *YurtAppSet {
if in == nil {
return nil
}
out := new(YurtAppSet)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *YurtAppSet) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppSetCondition) DeepCopyInto(out *YurtAppSetCondition) {
*out = *in
in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppSetCondition.
func (in *YurtAppSetCondition) DeepCopy() *YurtAppSetCondition {
if in == nil {
return nil
}
out := new(YurtAppSetCondition)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppSetList) DeepCopyInto(out *YurtAppSetList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]YurtAppSet, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppSetList.
func (in *YurtAppSetList) DeepCopy() *YurtAppSetList {
if in == nil {
return nil
}
out := new(YurtAppSetList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *YurtAppSetList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppSetSpec) DeepCopyInto(out *YurtAppSetSpec) {
*out = *in
if in.Selector != nil {
in, out := &in.Selector, &out.Selector
*out = new(v1.LabelSelector)
(*in).DeepCopyInto(*out)
}
in.WorkloadTemplate.DeepCopyInto(&out.WorkloadTemplate)
in.Topology.DeepCopyInto(&out.Topology)
if in.RevisionHistoryLimit != nil {
in, out := &in.RevisionHistoryLimit, &out.RevisionHistoryLimit
*out = new(int32)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppSetSpec.
func (in *YurtAppSetSpec) DeepCopy() *YurtAppSetSpec {
if in == nil {
return nil
}
out := new(YurtAppSetSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtAppSetStatus) DeepCopyInto(out *YurtAppSetStatus) {
*out = *in
if in.CollisionCount != nil {
in, out := &in.CollisionCount, &out.CollisionCount
*out = new(int32)
**out = **in
}
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]YurtAppSetCondition, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.WorkloadSummaries != nil {
in, out := &in.WorkloadSummaries, &out.WorkloadSummaries
*out = make([]WorkloadSummary, len(*in))
copy(*out, *in)
}
if in.PoolReplicas != nil {
in, out := &in.PoolReplicas, &out.PoolReplicas
*out = make(map[string]int32, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtAppSetStatus.
func (in *YurtAppSetStatus) DeepCopy() *YurtAppSetStatus {
if in == nil {
return nil
}
out := new(YurtAppSetStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtStaticSet) DeepCopyInto(out *YurtStaticSet) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
out.Status = in.Status
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtStaticSet.
func (in *YurtStaticSet) DeepCopy() *YurtStaticSet {
if in == nil {
return nil
}
out := new(YurtStaticSet)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *YurtStaticSet) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtStaticSetList) DeepCopyInto(out *YurtStaticSetList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]YurtStaticSet, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtStaticSetList.
func (in *YurtStaticSetList) DeepCopy() *YurtStaticSetList {
if in == nil {
return nil
}
out := new(YurtStaticSetList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *YurtStaticSetList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtStaticSetSpec) DeepCopyInto(out *YurtStaticSetSpec) {
*out = *in
in.UpgradeStrategy.DeepCopyInto(&out.UpgradeStrategy)
if in.RevisionHistoryLimit != nil {
in, out := &in.RevisionHistoryLimit, &out.RevisionHistoryLimit
*out = new(int32)
**out = **in
}
in.Template.DeepCopyInto(&out.Template)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtStaticSetSpec.
func (in *YurtStaticSetSpec) DeepCopy() *YurtStaticSetSpec {
if in == nil {
return nil
}
out := new(YurtStaticSetSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtStaticSetStatus) DeepCopyInto(out *YurtStaticSetStatus) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtStaticSetStatus.
func (in *YurtStaticSetStatus) DeepCopy() *YurtStaticSetStatus {
if in == nil {
return nil
}
out := new(YurtStaticSetStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *YurtStaticSetUpgradeStrategy) DeepCopyInto(out *YurtStaticSetUpgradeStrategy) {
*out = *in
if in.MaxUnavailable != nil {
in, out := &in.MaxUnavailable, &out.MaxUnavailable
*out = new(intstr.IntOrString)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new YurtStaticSetUpgradeStrategy.
func (in *YurtStaticSetUpgradeStrategy) DeepCopy() *YurtStaticSetUpgradeStrategy {
if in == nil {
return nil
}
out := new(YurtStaticSetUpgradeStrategy)
in.DeepCopyInto(out)
return out
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package v1beta1 contains API Schema definitions for the apps v1beta1 API group
// +kubebuilder:object:generate=true
// +groupName=apps.openyurt.io
package v1beta1
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/scheme"
)
var (
// GroupVersion is group version used to register these objects
GroupVersion = schema.GroupVersion{Group: "apps.openyurt.io", Version: "v1beta1"}
SchemeGroupVersion = GroupVersion
// SchemeBuilder is used to add go types to the GroupVersionKind scheme
SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion}
// AddToScheme adds the types in this group-version to the given scheme.
AddToScheme = SchemeBuilder.AddToScheme
)
// Resource is required by pkg/client/listers/...
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1beta1
/*
Implementing the hub method is pretty easy -- we just have to add an empty
method called `Hub()` to serve as a
[marker](https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/conversion?tab=doc#Hub).
*/
// Hub marks this type as a conversion hub.
func (*NodePool) Hub() {}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1beta1
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type NodePoolType string
const (
Edge NodePoolType = "Edge"
Cloud NodePoolType = "Cloud"
)
// NodePoolSpec defines the desired state of NodePool
type NodePoolSpec struct {
// The type of the NodePool
// +optional
Type NodePoolType `json:"type,omitempty"`
// HostNetwork is used to specify that cni components(like flannel)
// will not be installed on the nodes of this NodePool.
// This means all pods on the nodes of this NodePool will use
// HostNetwork and share network namespace with host machine.
HostNetwork bool `json:"hostNetwork,omitempty"`
// If specified, the Labels will be added to all nodes.
// NOTE: existing labels with samy keys on the nodes will be overwritten.
// +optional
Labels map[string]string `json:"labels,omitempty"`
// If specified, the Annotations will be added to all nodes.
// NOTE: existing labels with samy keys on the nodes will be overwritten.
// +optional
Annotations map[string]string `json:"annotations,omitempty"`
// If specified, the Taints will be added to all nodes.
// +optional
Taints []v1.Taint `json:"taints,omitempty"`
}
// NodePoolStatus defines the observed state of NodePool
type NodePoolStatus struct {
// Total number of ready nodes in the pool.
// +optional
ReadyNodeNum int32 `json:"readyNodeNum"`
// Total number of unready nodes in the pool.
// +optional
UnreadyNodeNum int32 `json:"unreadyNodeNum"`
// The list of nodes' names in the pool
// +optional
Nodes []string `json:"nodes,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +genclient
// +kubebuilder:object:root=true
// +kubebuilder:resource:scope=Cluster,path=nodepools,shortName=np,categories=all
// +kubebuilder:printcolumn:name="Type",type="string",JSONPath=".spec.type",description="The type of nodepool"
// +kubebuilder:printcolumn:name="ReadyNodes",type="integer",JSONPath=".status.readyNodeNum",description="The number of ready nodes in the pool"
// +kubebuilder:printcolumn:name="NotReadyNodes",type="integer",JSONPath=".status.unreadyNodeNum"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// +kubebuilder:subresource:status
// +genclient:nonNamespaced
// +kubebuilder:storageversion
// NodePool is the Schema for the nodepools API
type NodePool struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec NodePoolSpec `json:"spec,omitempty"`
Status NodePoolStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// NodePoolList contains a list of NodePool
type NodePoolList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []NodePool `json:"items"`
}
func init() {
SchemeBuilder.Register(&NodePool{}, &NodePoolList{})
}
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by controller-gen. DO NOT EDIT.
package v1beta1
import (
"k8s.io/api/core/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodePool) DeepCopyInto(out *NodePool) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodePool.
func (in *NodePool) DeepCopy() *NodePool {
if in == nil {
return nil
}
out := new(NodePool)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *NodePool) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodePoolList) DeepCopyInto(out *NodePoolList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]NodePool, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodePoolList.
func (in *NodePoolList) DeepCopy() *NodePoolList {
if in == nil {
return nil
}
out := new(NodePoolList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *NodePoolList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodePoolSpec) DeepCopyInto(out *NodePoolSpec) {
*out = *in
if in.Labels != nil {
in, out := &in.Labels, &out.Labels
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.Annotations != nil {
in, out := &in.Annotations, &out.Annotations
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.Taints != nil {
in, out := &in.Taints, &out.Taints
*out = make([]v1.Taint, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodePoolSpec.
func (in *NodePoolSpec) DeepCopy() *NodePoolSpec {
if in == nil {
return nil
}
out := new(NodePoolSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodePoolStatus) DeepCopyInto(out *NodePoolStatus) {
*out = *in
if in.Nodes != nil {
in, out := &in.Nodes, &out.Nodes
*out = make([]string, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodePoolStatus.
func (in *NodePoolStatus) DeepCopy() *NodePoolStatus {
if in == nil {
return nil
}
out := new(NodePoolStatus)
in.DeepCopyInto(out)
return out
}
/*
Copyright 2020 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package projectinfo
import (
"fmt"
"runtime"
"strings"
)
var (
projectPrefix = "yurt"
labelPrefix = "openyurt.io"
gitVersion = "v0.0.0"
gitCommit = "unknown"
buildDate = "1970-01-01T00:00:00Z"
maintainingVersions = "unknown"
separator = ","
nodePoolLabelKey = "apps.openyurt.io/nodepool"
)
func ShortAgentVersion() string {
commit := gitCommit
if len(gitCommit) > 7 {
commit = gitCommit[:7]
}
return GetAgentName() + "/" + gitVersion + "-" + commit
}
func ShortServerVersion() string {
commit := gitCommit
if len(gitCommit) > 7 {
commit = gitCommit[:7]
}
return GetServerName() + "/" + gitVersion + "-" + commit
}
// The project prefix is: yurt
func GetProjectPrefix() string {
return projectPrefix
}
// Server name: yurttunnel-server
func GetServerName() string {
return projectPrefix + "tunnel-server"
}
// tunnel server label: yurt-tunnel-server
func YurtTunnelServerLabel() string {
return strings.TrimSuffix(projectPrefix, "-") + "-tunnel-server"
}
// Agent name: yurttunnel-agent
func GetAgentName() string {
return projectPrefix + "tunnel-agent"
}
// GetEdgeWorkerLabelKey returns the edge-worker label ("openyurt.io/is-edge-worker"),
// which is used to identify if a node is a edge node ("true") or a cloud node ("false")
func GetEdgeWorkerLabelKey() string {
return labelPrefix + "/is-edge-worker"
}
// GetHubName returns name of yurthub agent: yurthub
func GetHubName() string {
return projectPrefix + "hub"
}
// GetEdgeEnableTunnelLabelKey returns the tunnel agent label ("openyurt.io/edge-enable-reverseTunnel-client"),
// which is used to identify if tunnel agent is running on the node or not.
func GetEdgeEnableTunnelLabelKey() string {
return labelPrefix + "/edge-enable-reverseTunnel-client"
}
// GetTunnelName returns name of tunnel: yurttunnel
func GetTunnelName() string {
return projectPrefix + "tunnel"
}
// GetYurtManagerName returns name of openyurt-manager: yurt-manager
func GetYurtManagerName() string {
return "yurt-manager"
}
// GetAutonomyAnnotation returns annotation key for node autonomy
func GetAutonomyAnnotation() string {
return fmt.Sprintf("node.beta.%s/autonomy", labelPrefix)
}
// normalizeGitCommit reserve 7 characters for gitCommit
func normalizeGitCommit(commit string) string {
if len(commit) > 7 {
return commit[:7]
}
return commit
}
// GetNodePoolLabel returns label for specifying nodepool
func GetNodePoolLabel() string {
return nodePoolLabelKey
}
// Info contains version information.
type Info struct {
GitVersion string `json:"gitVersion"`
GitCommit string `json:"gitCommit"`
BuildDate string `json:"buildDate"`
GoVersion string `json:"goVersion"`
Compiler string `json:"compiler"`
Platform string `json:"platform"`
AllVersions []string `json:"allVersions"`
NodePoolLabelKey string `json:"nodePoolLabelKey"`
}
// Get returns the overall codebase version.
func Get() Info {
return Info{
GitVersion: gitVersion,
GitCommit: normalizeGitCommit(gitCommit),
BuildDate: buildDate,
GoVersion: runtime.Version(),
Compiler: runtime.Compiler,
Platform: fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH),
AllVersions: strings.Split(maintainingVersions, separator),
NodePoolLabelKey: nodePoolLabelKey,
}
}
/*
Copyright 2018 The Kubernetes Authors.
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"errors"
"fmt"
"sync"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var _ inject.Injector = &Controller{}
// Controller implements controller.Controller.
type Controller struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string
// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.RateLimitingInterface
// SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
// Deprecated: the caller should handle injected fields itself.
SetFields func(i interface{}) error
// mu is used to synchronize Controller setup
mu sync.Mutex
// Started is true if the Controller has been Started
Started bool
// ctx is the context that was passed to Start() and used when starting watches.
//
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
// while we usually always strive to follow best practices, we consider this a legacy case and it should
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context.Context
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
// Defaults to 2 minutes if not set.
CacheSyncTimeout time.Duration
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription
// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
RecoverPanic bool
}
// watchDescription contains all the information necessary to start a watch.
type watchDescription struct {
src source.Source
handler handler.EventHandler
predicates []predicate.Predicate
}
// Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
c.mu.Lock()
defer c.mu.Unlock()
// Inject Cache into arguments
if err := c.SetFields(src); err != nil {
return err
}
if err := c.SetFields(evthdler); err != nil {
return err
}
for _, pr := range prct {
if err := c.SetFields(pr); err != nil {
return err
}
}
// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
klog.V(2).InfoS("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
// Start implements controller.Controller.
func (c *Controller) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
// but lock outside to get proper handling of the queue shutdown
c.mu.Lock()
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}
// Set the internal context.
c.ctx = ctx
c.Queue = c.MakeQueue()
go func() {
<-ctx.Done()
c.Queue.ShutDown()
}()
err := func() error {
defer c.mu.Unlock()
// TODO(pwittrock): Reconsider HandleCrash
defer utilruntime.HandleCrash()
// NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intendeded
// caches.
for _, watch := range c.startWatches {
klog.V(2).InfoS("Starting EventSource", "source", fmt.Sprintf("%s", watch.src), "controller", c.Name)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
klog.V(2).InfoS("Starting Controller WatchSource", "controller", c.Name)
for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := func() error {
// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("could not wait for %s caches to sync: %w", c.Name, err)
klog.ErrorS(err, "Could not wait for Cache to sync")
return err
}
return nil
}(); err != nil {
return err
}
}
// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil
klog.V(2).InfoS("No workers need to be started", "controller", c.Name)
c.Started = true
return nil
}()
if err != nil {
return err
}
<-ctx.Done()
klog.V(2).InfoS("No Reconcile controller finished", "controller", c.Name)
return nil
}
// InjectFunc implement SetFields.Injector.
func (c *Controller) InjectFunc(f inject.Func) error {
c.SetFields = f
return nil
}
func (c *Controller) WaitForStarted(ctx context.Context) bool {
err := wait.PollImmediateUntil(200*time.Millisecond, func() (bool, error) {
c.mu.Lock()
started := c.Started
c.mu.Unlock()
if !started {
return false, nil
}
return true, nil
}, ctx.Done())
if err != nil {
klog.V(2).InfoS("could not start %s controller , %v", c.Name, err)
return false
}
klog.V(2).InfoS("%s controller started", c.Name)
return true
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"embed"
"encoding/json"
"path/filepath"
"gopkg.in/yaml.v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)
var (
//go:embed EdgeXConfig
EdgeXFS embed.FS
folder = "EdgeXConfig/"
ManifestPath = filepath.Join(folder, "manifest.yaml")
securityFile = filepath.Join(folder, "config.json")
nosectyFile = filepath.Join(folder, "config-nosecty.json")
)
type EdgeXConfig struct {
Versions []*Version `yaml:"versions,omitempty" json:"versions,omitempty"`
}
type Version struct {
Name string `yaml:"versionName" json:"versionName"`
ConfigMaps []corev1.ConfigMap `yaml:"configMaps,omitempty" json:"configMaps,omitempty"`
Components []*Component `yaml:"components,omitempty" json:"components,omitempty"`
}
type Component struct {
Name string `yaml:"name" json:"name"`
Service *corev1.ServiceSpec `yaml:"service,omitempty" json:"service,omitempty"`
Deployment *appsv1.DeploymentSpec `yaml:"deployment,omitempty" json:"deployment,omitempty"`
}
type Manifest struct {
Updated string `yaml:"updated"`
Count int `yaml:"count"`
LatestVersion string `yaml:"latestVersion"`
Versions []ManifestVersion `yaml:"versions"`
}
type ManifestVersion struct {
Name string `yaml:"name"`
RequiredComponents []string `yaml:"requiredComponents"`
}
func ExtractVersionsName(manifest *Manifest) sets.String {
versionsNameSet := sets.NewString()
for _, version := range manifest.Versions {
versionsNameSet.Insert(version.Name)
}
return versionsNameSet
}
func ExtractRequiredComponentsName(manifest *Manifest, versionName string) sets.String {
requiredComponentSet := sets.NewString()
for _, version := range manifest.Versions {
if version.Name == versionName {
for _, c := range version.RequiredComponents {
requiredComponentSet.Insert(c)
}
break
}
}
return requiredComponentSet
}
// PlatformAdminControllerConfiguration contains elements describing PlatformAdminController.
type PlatformAdminControllerConfiguration struct {
Manifest Manifest
SecurityComponents map[string][]*Component
NoSectyComponents map[string][]*Component
SecurityConfigMaps map[string][]corev1.ConfigMap
NoSectyConfigMaps map[string][]corev1.ConfigMap
}
func NewPlatformAdminControllerConfiguration() *PlatformAdminControllerConfiguration {
var (
edgexconfig = EdgeXConfig{}
edgexnosectyconfig = EdgeXConfig{}
conf = PlatformAdminControllerConfiguration{
Manifest: Manifest{},
SecurityComponents: make(map[string][]*Component),
NoSectyComponents: make(map[string][]*Component),
SecurityConfigMaps: make(map[string][]corev1.ConfigMap),
NoSectyConfigMaps: make(map[string][]corev1.ConfigMap),
}
)
// Read the EdgeX configuration file
manifestContent, err := EdgeXFS.ReadFile(ManifestPath)
if err != nil {
klog.Errorf("File to open the embed EdgeX manifest file: %v", err)
return nil
}
securityContent, err := EdgeXFS.ReadFile(securityFile)
if err != nil {
klog.Errorf("could not open the embed EdgeX security config: %v", err)
return nil
}
nosectyContent, err := EdgeXFS.ReadFile(nosectyFile)
if err != nil {
klog.Errorf("could not open the embed EdgeX nosecty config: %v", err)
return nil
}
// Unmarshal the EdgeX configuration file
if err := yaml.Unmarshal(manifestContent, &conf.Manifest); err != nil {
klog.Errorf("Error manifest EdgeX configuration file: %v", err)
return nil
}
if err = json.Unmarshal(securityContent, &edgexconfig); err != nil {
klog.Errorf("could not unmarshal the embed EdgeX security config: %v", err)
return nil
}
for _, version := range edgexconfig.Versions {
conf.SecurityComponents[version.Name] = version.Components
conf.SecurityConfigMaps[version.Name] = version.ConfigMaps
}
if err := json.Unmarshal(nosectyContent, &edgexnosectyconfig); err != nil {
klog.Errorf("could not unmarshal the embed EdgeX nosecty config: %v", err)
return nil
}
for _, version := range edgexnosectyconfig.Versions {
conf.NoSectyComponents[version.Name] = version.Components
conf.NoSectyConfigMaps[version.Name] = version.ConfigMaps
}
return &conf
}
/*
Copyright 2020 The OpenYurt Authors.
Copyright 2019 The Kruise Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package refmanager
import (
"context"
"fmt"
"reflect"
"sync"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
// RefManager provides the method to
type RefManager struct {
client client.Client
selector labels.Selector
owner metav1.Object
ownerType reflect.Type
schema *runtime.Scheme
once sync.Once
canAdoptErr error
}
// New returns a RefManager that exposes
// methods to manage the controllerRef of pods.
func New(client client.Client, selector *metav1.LabelSelector, owner metav1.Object, schema *runtime.Scheme) (*RefManager, error) {
s, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
return nil, err
}
ownerType := reflect.TypeOf(owner)
if ownerType.Kind() == reflect.Ptr {
ownerType = ownerType.Elem()
}
return &RefManager{
client: client,
selector: s,
owner: owner,
ownerType: ownerType,
schema: schema,
}, nil
}
// ClaimOwnedObjects tries to take ownership of a list of objects for this controller.
func (mgr *RefManager) ClaimOwnedObjects(objs []metav1.Object, filters ...func(metav1.Object) bool) ([]metav1.Object, error) {
match := func(obj metav1.Object) bool {
if !mgr.selector.Matches(labels.Set(obj.GetLabels())) {
return false
}
for _, filter := range filters {
if !filter(obj) {
return false
}
}
return true
}
claimObjs := []metav1.Object{}
errlist := []error{}
for _, obj := range objs {
ok, err := mgr.claimObject(obj, match)
if err != nil {
errlist = append(errlist, err)
} else if ok {
claimObjs = append(claimObjs, obj)
}
}
return claimObjs, utilerrors.NewAggregate(errlist)
}
func (mgr *RefManager) canAdoptOnce() error {
mgr.once.Do(func() {
mgr.canAdoptErr = mgr.canAdopt()
})
return mgr.canAdoptErr
}
func (mgr *RefManager) getOwner() (runtime.Object, error) {
return getOwner(mgr.owner, mgr.schema, mgr.client)
}
var getOwner = func(owner metav1.Object, schema *runtime.Scheme, c client.Client) (client.Object, error) {
cliObj, ok := owner.(client.Object)
if !ok {
return nil, fmt.Errorf("could not convert %s/%s to runtime object", owner.GetNamespace(), owner.GetName())
}
kinds, _, err := schema.ObjectKinds(cliObj)
if err != nil {
return nil, err
}
runtimeObj, err := schema.New(kinds[0])
if err != nil {
return nil, err
}
obj, ok := runtimeObj.(client.Object)
if !ok {
return nil, fmt.Errorf("could not convert %s/%s to client object", owner.GetNamespace(), owner.GetName())
}
return obj, c.Get(context.TODO(), client.ObjectKey{Namespace: owner.GetNamespace(), Name: owner.GetName()}, obj)
}
func (mgr *RefManager) updateOwnee(object client.Object) error {
return updateOwnee(object, mgr.client)
}
var updateOwnee = func(object client.Object, c client.Client) error {
return c.Update(context.TODO(), object)
}
func (mgr *RefManager) canAdopt() error {
fresh, err := mgr.getOwner()
if err != nil {
return err
}
freshObj, ok := fresh.(metav1.Object)
if !ok {
return fmt.Errorf("expected k8s.io/apimachinery/pkg/apis/meta/v1.object when getting owner %v/%v UID %v",
mgr.owner.GetNamespace(), mgr.owner.GetName(), mgr.owner.GetUID())
}
if freshObj.GetUID() != mgr.owner.GetUID() {
return fmt.Errorf("original owner %v/%v is gone: got uid %v, wanted %v",
mgr.owner.GetNamespace(), mgr.owner.GetName(), freshObj.GetUID(), mgr.owner.GetUID())
}
if freshObj.GetDeletionTimestamp() != nil {
return fmt.Errorf("%v/%v has just been deleted at %v",
mgr.owner.GetNamespace(), mgr.owner.GetName(), freshObj.GetDeletionTimestamp())
}
return nil
}
func (mgr *RefManager) adopt(obj metav1.Object) error {
if err := mgr.canAdoptOnce(); err != nil {
return fmt.Errorf("can't adopt Object %v/%v (%v): %v", obj.GetNamespace(), obj.GetName(), obj.GetUID(), err)
}
if mgr.schema == nil {
return nil
}
if err := controllerutil.SetControllerReference(mgr.owner, obj, mgr.schema); err != nil {
return fmt.Errorf("can't set Object %v/%v (%v) owner reference: %v", obj.GetNamespace(), obj.GetName(), obj.GetUID(), err)
}
runtimeObj, ok := obj.(client.Object)
if !ok {
return fmt.Errorf("can't update Object %v/%v (%v) owner reference: could not cast to runtime.Object", obj.GetNamespace(), obj.GetName(), obj.GetUID())
}
if err := mgr.updateOwnee(runtimeObj); err != nil {
return fmt.Errorf("can't update Object %v/%v (%v) owner reference: %v", obj.GetNamespace(), obj.GetName(), obj.GetUID(), err)
}
return nil
}
func (mgr *RefManager) release(obj metav1.Object) error {
idx := -1
for i, ref := range obj.GetOwnerReferences() {
if ref.UID == mgr.owner.GetUID() {
idx = i
break
}
}
if idx > -1 {
runtimeObj, ok := obj.(client.Object)
if !ok {
return fmt.Errorf("can't remove Pod %v/%v (%v) owner reference: could not cast to runtime.Object", obj.GetNamespace(), obj.GetName(), obj.GetUID())
}
obj.SetOwnerReferences(append(obj.GetOwnerReferences()[:idx], obj.GetOwnerReferences()[idx+1:]...))
if err := mgr.updateOwnee(runtimeObj); err != nil {
return fmt.Errorf("can't remove Pod %v/%v (%v) owner reference %v/%v (%v): %v",
obj.GetNamespace(), obj.GetName(), obj.GetUID(), obj.GetNamespace(), obj.GetName(), mgr.owner.GetUID(), err)
}
}
return nil
}
func (mgr *RefManager) claimObject(obj metav1.Object, match func(metav1.Object) bool) (bool, error) {
controllerRef := metav1.GetControllerOf(obj)
if controllerRef != nil {
if controllerRef.UID != mgr.owner.GetUID() {
// Owned by someone else. Ignore.
return false, nil
}
if match(obj) {
// We already own it and the selector matches.
// Return true (successfully claimed) before checking deletion timestamp.
// We're still allowed to claim things we already own while being deleted
// because doing so requires taking no actions.
return true, nil
}
// Owned by us but selector doesn't match.
// Try to release, unless we're being deleted.
if mgr.owner.GetDeletionTimestamp() != nil {
return false, nil
}
if err := mgr.release(obj); err != nil {
// If the pod no longer exists, ignore the error.
if apierrors.IsNotFound(err) {
return false, nil
}
// Either someone else released it, or there was a transient error.
// The controller should requeue and try again if it's still stale.
return false, err
}
// Successfully released.
return false, nil
}
// It's an orphan.
if mgr.owner.GetDeletionTimestamp() != nil || !match(obj) {
// Ignore if we're being deleted or selector doesn't match.
return false, nil
}
if obj.GetDeletionTimestamp() != nil {
// Ignore if the object is being deleted
return false, nil
}
// Selector matches. Try to adopt.
if err := mgr.adopt(obj); err != nil {
// If the pod no longer exists, ignore the error.
if apierrors.IsNotFound(err) {
return false, nil
}
// Either someone else claimed it first, or there was a transient error.
// The controller should requeue and try again if it's still orphaned.
return false, err
}
// Successfully adopted.
return true, nil
}
/*
Copyright 2020 The OpenYurt Authors.
Copyright 2019 The Kruise Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"sync"
"time"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/integer"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
controllerimpl "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/internal/controller"
)
// SlowStartBatch tries to call the provided function a total of 'count' times,
// starting slow to check for errors, then speeding up if calls succeed.
//
// It groups the calls into batches, starting with a group of initialBatchSize.
// Within each batch, it may call the function multiple times concurrently with its index.
//
// If a whole batch succeeds, the next batch may get exponentially larger.
// If there are any failures in a batch, all remaining batches are skipped
// after waiting for the current batch to complete.
//
// It returns the number of successful calls to the function.
func SlowStartBatch(count int, initialBatchSize int, fn func(index int) error) (int, error) {
remaining := count
successes := 0
index := 0
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func(idx int) {
defer wg.Done()
if err := fn(idx); err != nil {
errCh <- err
}
}(index)
index++
}
wg.Wait()
curSuccesses := batchSize - len(errCh)
successes += curSuccesses
if len(errCh) > 0 {
return successes, <-errCh
}
remaining -= batchSize
}
return successes, nil
}
func NewNoReconcileController(name string, mgr manager.Manager, options controller.Options) (*controllerimpl.Controller, error) {
if len(name) == 0 {
return nil, fmt.Errorf("must specify Name for Controller")
}
if options.Log == nil {
options.Log = mgr.GetLogger()
}
if options.CacheSyncTimeout == 0 {
options.CacheSyncTimeout = 2 * time.Minute
}
if options.RateLimiter == nil {
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
}
// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
return nil, err
}
// Create controller with dependencies set
c := &controllerimpl.Controller{
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
RecoverPanic: options.RecoverPanic,
}
if err := mgr.Add(c); err != nil {
return c, err
}
return c, nil
}
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package yurtappdaemon
import (
"context"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
)
type EnqueueYurtAppDaemonForNodePool struct {
client client.Client
}
func (e *EnqueueYurtAppDaemonForNodePool) Create(event event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) {
e.addAllYurtAppDaemonToWorkQueue(limitingInterface)
}
func (e *EnqueueYurtAppDaemonForNodePool) Update(event event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
e.addAllYurtAppDaemonToWorkQueue(limitingInterface)
}
func (e *EnqueueYurtAppDaemonForNodePool) Delete(event event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
e.addAllYurtAppDaemonToWorkQueue(limitingInterface)
}
func (e *EnqueueYurtAppDaemonForNodePool) Generic(event event.GenericEvent, limitingInterface workqueue.RateLimitingInterface) {
return
}
func (e *EnqueueYurtAppDaemonForNodePool) addAllYurtAppDaemonToWorkQueue(limitingInterface workqueue.RateLimitingInterface) {
ydas := &v1alpha1.YurtAppDaemonList{}
if err := e.client.List(context.TODO(), ydas); err != nil {
return
}
for _, ud := range ydas.Items {
addYurtAppDaemonToWorkQueue(ud.GetNamespace(), ud.GetName(), limitingInterface)
}
}
var _ handler.EventHandler = &EnqueueYurtAppDaemonForNodePool{}
// addYurtAppDaemonToWorkQueue adds the YurtAppDaemon the reconciler's workqueue
func addYurtAppDaemonToWorkQueue(namespace, name string,
q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{Name: name, Namespace: namespace},
})
}
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package yurtappdaemon
import (
"bytes"
"context"
"encoding/json"
"fmt"
apps "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/history"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
appsalphav1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/util/refmanager"
)
func (r *ReconcileYurtAppDaemon) controlledHistories(yad *appsalphav1.YurtAppDaemon) ([]*apps.ControllerRevision, error) {
// List all histories to include those that don't match the selector anymore
// but have a ControllerRef pointing to the controller.
selector, err := metav1.LabelSelectorAsSelector(yad.Spec.Selector)
if err != nil {
return nil, err
}
histories := &apps.ControllerRevisionList{}
err = r.Client.List(context.TODO(), histories, &client.ListOptions{LabelSelector: selector})
if err != nil {
return nil, err
}
klog.V(1).Infof("List controller revision of YurtAppDaemon %s/%s: count %d\n", yad.Namespace, yad.Name, len(histories.Items))
// Use ControllerRefManager to adopt/orphan as needed.
cm, err := refmanager.New(r.Client, yad.Spec.Selector, yad, r.scheme)
if err != nil {
return nil, err
}
mts := make([]metav1.Object, len(histories.Items))
for i, history := range histories.Items {
mts[i] = history.DeepCopy()
}
claims, err := cm.ClaimOwnedObjects(mts)
if err != nil {
return nil, err
}
claimHistories := make([]*apps.ControllerRevision, len(claims))
for i, mt := range claims {
claimHistories[i] = mt.(*apps.ControllerRevision)
}
return claimHistories, nil
}
func (r *ReconcileYurtAppDaemon) constructYurtAppDaemonRevisions(yad *appsalphav1.YurtAppDaemon) (*apps.ControllerRevision, *apps.ControllerRevision, int32, error) {
var currentRevision, updateRevision *apps.ControllerRevision
revisions, err := r.controlledHistories(yad)
if err != nil {
if yad.Status.CollisionCount == nil {
return currentRevision, updateRevision, 0, err
}
return currentRevision, updateRevision, *yad.Status.CollisionCount, err
}
history.SortControllerRevisions(revisions)
cleanedRevision, err := r.cleanExpiredRevision(yad, &revisions)
if err != nil {
if yad.Status.CollisionCount == nil {
return currentRevision, updateRevision, 0, err
}
return currentRevision, updateRevision, *yad.Status.CollisionCount, err
}
revisions = *cleanedRevision
// Use a local copy of set.Status.CollisionCount to avoid modifying set.Status directly.
// This copy is returned so the value gets carried over to set.Status in updateStatefulSet.
var collisionCount int32
if yad.Status.CollisionCount != nil {
collisionCount = *yad.Status.CollisionCount
}
// create a new revision from the current set
updateRevision, err = r.newRevision(yad, nextRevision(revisions), &collisionCount)
if err != nil {
return nil, nil, collisionCount, err
}
// find any equivalent revisions
equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
equalCount := len(equalRevisions)
revisionCount := len(revisions)
if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
// if the equivalent revision is immediately prior the update revision has not changed
updateRevision = revisions[revisionCount-1]
} else if equalCount > 0 {
// if the equivalent revision is not immediately prior we will roll back by incrementing the
// Revision of the equivalent revision
equalRevisions[equalCount-1].Revision = updateRevision.Revision
err := r.Client.Update(context.TODO(), equalRevisions[equalCount-1])
if err != nil {
return nil, nil, collisionCount, err
}
updateRevision = equalRevisions[equalCount-1]
} else {
//if there is no equivalent revision we create a new one
updateRevision, err = r.createControllerRevision(yad, updateRevision, &collisionCount)
if err != nil {
return nil, nil, collisionCount, err
}
}
// attempt to find the revision that corresponds to the current revision
for i := range revisions {
if revisions[i].Name == yad.Status.CurrentRevision {
currentRevision = revisions[i]
}
}
// if the current revision is nil we initialize the history by setting it to the update revision
if currentRevision == nil {
currentRevision = updateRevision
}
return currentRevision, updateRevision, collisionCount, nil
}
func (r *ReconcileYurtAppDaemon) cleanExpiredRevision(yad *appsalphav1.YurtAppDaemon,
sortedRevisions *[]*apps.ControllerRevision) (*[]*apps.ControllerRevision, error) {
exceedNum := len(*sortedRevisions) - int(*yad.Spec.RevisionHistoryLimit)
if exceedNum <= 0 {
return sortedRevisions, nil
}
live := map[string]bool{}
live[yad.Status.CurrentRevision] = true
for i, revision := range *sortedRevisions {
if _, exist := live[revision.Name]; exist {
continue
}
if i >= exceedNum {
break
}
if err := r.Client.Delete(context.TODO(), revision); err != nil {
return sortedRevisions, err
}
}
cleanedRevisions := (*sortedRevisions)[exceedNum:]
return &cleanedRevisions, nil
}
// createControllerRevision creates the controller revision owned by the parent.
func (r *ReconcileYurtAppDaemon) createControllerRevision(parent metav1.Object, revision *apps.ControllerRevision, collisionCount *int32) (*apps.ControllerRevision, error) {
if collisionCount == nil {
return nil, fmt.Errorf("collisionCount should not be nil")
}
// Clone the input
clone := revision.DeepCopy()
var err error
// Continue to attempt to create the revision updating the name with a new hash on each iteration
for {
hash := history.HashControllerRevision(revision, collisionCount)
// Update the revisions name
clone.Name = history.ControllerRevisionName(parent.GetName(), hash)
err = r.Client.Create(context.TODO(), clone)
if errors.IsAlreadyExists(err) {
exists := &apps.ControllerRevision{}
err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: parent.GetNamespace(), Name: clone.Name}, exists)
if err != nil {
return nil, err
}
if bytes.Equal(exists.Data.Raw, clone.Data.Raw) {
return exists, nil
}
*collisionCount++
continue
}
return clone, err
}
}
// newRevision creates a new ControllerRevision containing a patch that reapplies the target state of set.
// The Revision of the returned ControllerRevision is set to revision. If the returned error is nil, the returned
// ControllerRevision is valid. StatefulSet revisions are stored as patches that re-apply the current state of set
// to a new StatefulSet using a strategic merge patch to replace the saved state of the new StatefulSet.
func (r *ReconcileYurtAppDaemon) newRevision(ud *appsalphav1.YurtAppDaemon, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) {
patch, err := getYurtAppDaemonPatch(ud)
if err != nil {
return nil, err
}
gvk, err := apiutil.GVKForObject(ud, r.scheme)
if err != nil {
return nil, err
}
var selectedLabels map[string]string
switch {
case ud.Spec.WorkloadTemplate.StatefulSetTemplate != nil:
selectedLabels = ud.Spec.WorkloadTemplate.StatefulSetTemplate.Labels
case ud.Spec.WorkloadTemplate.DeploymentTemplate != nil:
selectedLabels = ud.Spec.WorkloadTemplate.DeploymentTemplate.Labels
default:
klog.Errorf("YurtAppDaemon(%s/%s) need specific WorkloadTemplate", ud.GetNamespace(), ud.GetName())
return nil, fmt.Errorf("YurtAppDaemon(%s/%s) need specific WorkloadTemplate", ud.GetNamespace(), ud.GetName())
}
cr, err := history.NewControllerRevision(ud,
gvk,
selectedLabels,
runtime.RawExtension{Raw: patch},
revision,
collisionCount)
if err != nil {
return nil, err
}
cr.Namespace = ud.Namespace
return cr, nil
}
// nextRevision finds the next valid revision number based on revisions. If the length of revisions
// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method
// assumes that revisions has been sorted by Revision.
func nextRevision(revisions []*apps.ControllerRevision) int64 {
count := len(revisions)
if count <= 0 {
return 1
}
return revisions[count-1].Revision + 1
}
func getYurtAppDaemonPatch(ud *appsalphav1.YurtAppDaemon) ([]byte, error) {
dsBytes, err := json.Marshal(ud)
if err != nil {
return nil, err
}
var raw map[string]interface{}
err = json.Unmarshal(dsBytes, &raw)
if err != nil {
return nil, err
}
objCopy := make(map[string]interface{})
specCopy := make(map[string]interface{})
// Create a patch of the YurtAppDaemon that replaces spec.template
spec := raw["spec"].(map[string]interface{})
template := spec["workloadTemplate"].(map[string]interface{})
specCopy["workloadTemplate"] = template
template["$patch"] = "replace"
objCopy["spec"] = specCopy
patch, err := json.Marshal(objCopy)
return patch, err
}
/*
Copyright 2022 The Openyurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package yurtappdaemon
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1helper "k8s.io/component-helpers/scheduling/corev1"
unitv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
)
const updateRetries = 5
func IsTolerationsAllTaints(tolerations []corev1.Toleration, taints []corev1.Taint) bool {
for i := range taints {
if !v1helper.TolerationsTolerateTaint(tolerations, &taints[i]) {
return false
}
}
return true
}
// NewYurtAppDaemonCondition creates a new YurtAppDaemon condition.
func NewYurtAppDaemonCondition(condType unitv1alpha1.YurtAppDaemonConditionType, status corev1.ConditionStatus, reason, message string) *unitv1alpha1.YurtAppDaemonCondition {
return &unitv1alpha1.YurtAppDaemonCondition{
Type: condType,
Status: status,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
}
// GetYurtAppDaemonCondition returns the condition with the provided type.
func GetYurtAppDaemonCondition(status unitv1alpha1.YurtAppDaemonStatus, condType unitv1alpha1.YurtAppDaemonConditionType) *unitv1alpha1.YurtAppDaemonCondition {
for i := range status.Conditions {
c := status.Conditions[i]
if c.Type == condType {
return &c
}
}
return nil
}
// RemoveYurtAppDaemonCondition removes the YurtAppDaemon condition with the provided type.
func RemoveYurtAppDaemonCondition(status *unitv1alpha1.YurtAppDaemonStatus, condType unitv1alpha1.YurtAppDaemonConditionType) {
status.Conditions = filterOutCondition(status.Conditions, condType)
}
// SetYurtAppDaemonCondition updates the YurtAppDaemon to include the provided condition. If the condition that
// we are about to add already exists and has the same status, reason and message then we are not going to update.
func SetYurtAppDaemonCondition(status *unitv1alpha1.YurtAppDaemonStatus, condition *unitv1alpha1.YurtAppDaemonCondition) {
currentCond := GetYurtAppDaemonCondition(*status, condition.Type)
if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
return
}
if currentCond != nil && currentCond.Status == condition.Status {
condition.LastTransitionTime = currentCond.LastTransitionTime
}
newConditions := filterOutCondition(status.Conditions, condition.Type)
status.Conditions = append(newConditions, *condition)
}
func filterOutCondition(conditions []unitv1alpha1.YurtAppDaemonCondition, condType unitv1alpha1.YurtAppDaemonConditionType) []unitv1alpha1.YurtAppDaemonCondition {
var newConditions []unitv1alpha1.YurtAppDaemonCondition
for _, c := range conditions {
if c.Type == condType {
continue
}
newConditions = append(newConditions, c)
}
return newConditions
}
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloadcontroller
import (
"context"
"errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"github.com/openyurtio/openyurt/pkg/apis/apps"
"github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/util/refmanager"
)
const updateRetries = 5
type DeploymentControllor struct {
client.Client
Scheme *runtime.Scheme
}
func (d *DeploymentControllor) GetTemplateType() v1alpha1.TemplateType {
return v1alpha1.DeploymentTemplateType
}
func (d *DeploymentControllor) DeleteWorkload(yda *v1alpha1.YurtAppDaemon, load *Workload) error {
klog.Infof("YurtAppDaemon[%s/%s] prepare delete Deployment[%s/%s]", yda.GetNamespace(),
yda.GetName(), load.Namespace, load.Name)
set := load.Spec.Ref.(runtime.Object)
cliSet, ok := set.(client.Object)
if !ok {
return errors.New("could not convert runtime.Object to client.Object")
}
return d.Delete(context.TODO(), cliSet, client.PropagationPolicy(metav1.DeletePropagationBackground))
}
// ApplyTemplate updates the object to the latest revision, depending on the YurtAppDaemon.
func (d *DeploymentControllor) ApplyTemplate(scheme *runtime.Scheme, yad *v1alpha1.YurtAppDaemon, nodepool v1alpha1.NodePool, revision string, set *appsv1.Deployment) error {
if set.Labels == nil {
set.Labels = map[string]string{}
}
for k, v := range yad.Spec.WorkloadTemplate.DeploymentTemplate.Labels {
set.Labels[k] = v
}
for k, v := range yad.Spec.Selector.MatchLabels {
set.Labels[k] = v
}
set.Labels[apps.ControllerRevisionHashLabelKey] = revision
set.Labels[apps.PoolNameLabelKey] = nodepool.GetName()
if set.Annotations == nil {
set.Annotations = map[string]string{}
}
for k, v := range yad.Spec.WorkloadTemplate.DeploymentTemplate.Annotations {
set.Annotations[k] = v
}
set.Annotations[apps.AnnotationRefNodePool] = nodepool.GetName()
set.Namespace = yad.GetNamespace()
set.GenerateName = getWorkloadPrefix(yad.GetName(), nodepool.GetName())
set.Spec = *yad.Spec.WorkloadTemplate.DeploymentTemplate.Spec.DeepCopy()
set.Spec.Selector.MatchLabels[apps.PoolNameLabelKey] = nodepool.GetName()
// set RequiredDuringSchedulingIgnoredDuringExecution nil
if set.Spec.Template.Spec.Affinity != nil && set.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
set.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = nil
}
if set.Spec.Template.Labels == nil {
set.Spec.Template.Labels = map[string]string{}
}
set.Spec.Template.Labels[apps.PoolNameLabelKey] = nodepool.GetName()
set.Spec.Template.Labels[apps.ControllerRevisionHashLabelKey] = revision
// use nodeSelector
set.Spec.Template.Spec.NodeSelector = CreateNodeSelectorByNodepoolName(nodepool.GetName())
// toleration
nodePoolTaints := TaintsToTolerations(nodepool.Spec.Taints)
set.Spec.Template.Spec.Tolerations = append(set.Spec.Template.Spec.Tolerations, nodePoolTaints...)
if err := controllerutil.SetControllerReference(yad, set, scheme); err != nil {
return err
}
return nil
}
func (d *DeploymentControllor) ObjectKey(load *Workload) client.ObjectKey {
return types.NamespacedName{
Namespace: load.Namespace,
Name: load.Name,
}
}
func (d *DeploymentControllor) UpdateWorkload(load *Workload, yad *v1alpha1.YurtAppDaemon, nodepool v1alpha1.NodePool, revision string) error {
klog.Infof("YurtAppDaemon[%s/%s] prepare update Deployment[%s/%s]", yad.GetNamespace(),
yad.GetName(), load.Namespace, load.Name)
deploy := &appsv1.Deployment{}
var updateError error
for i := 0; i < updateRetries; i++ {
getError := d.Client.Get(context.TODO(), d.ObjectKey(load), deploy)
if getError != nil {
return getError
}
if err := d.ApplyTemplate(d.Scheme, yad, nodepool, revision, deploy); err != nil {
return err
}
updateError = d.Client.Update(context.TODO(), deploy)
if updateError == nil {
break
}
}
return updateError
}
func (d *DeploymentControllor) CreateWorkload(yad *v1alpha1.YurtAppDaemon, nodepool v1alpha1.NodePool, revision string) error {
klog.Infof("YurtAppDaemon[%s/%s] prepare create new deployment by nodepool %s ", yad.GetNamespace(), yad.GetName(), nodepool.GetName())
deploy := appsv1.Deployment{}
if err := d.ApplyTemplate(d.Scheme, yad, nodepool, revision, &deploy); err != nil {
klog.Errorf("YurtAppDaemon[%s/%s] could not apply template, when create deployment: %v", yad.GetNamespace(),
yad.GetName(), err)
return err
}
return d.Client.Create(context.TODO(), &deploy)
}
func (d *DeploymentControllor) GetAllWorkloads(yad *v1alpha1.YurtAppDaemon) ([]*Workload, error) {
allDeployments := appsv1.DeploymentList{}
// 获得 YurtAppDaemon 对应的 所有Deployment, 根据OwnerRef
selector, err := metav1.LabelSelectorAsSelector(yad.Spec.Selector)
if err != nil {
return nil, err
}
// List all Deployment to include those that don't match the selector anymore but
// have a ControllerRef pointing to this controller.
if err := d.Client.List(context.TODO(), &allDeployments, &client.ListOptions{LabelSelector: selector}); err != nil {
return nil, err
}
manager, err := refmanager.New(d.Client, yad.Spec.Selector, yad, d.Scheme)
if err != nil {
return nil, err
}
selected := make([]metav1.Object, 0, len(allDeployments.Items))
for i := 0; i < len(allDeployments.Items); i++ {
t := allDeployments.Items[i]
selected = append(selected, &t)
}
objs, err := manager.ClaimOwnedObjects(selected)
if err != nil {
return nil, err
}
workloads := make([]*Workload, 0, len(objs))
for i, o := range objs {
deploy := o.(*appsv1.Deployment)
spec := deploy.Spec
var availableCondition corev1.ConditionStatus
for _, condition := range deploy.Status.Conditions {
if condition.Type == appsv1.DeploymentAvailable {
availableCondition = condition.Status
break
}
}
w := &Workload{
Name: o.GetName(),
Namespace: o.GetNamespace(),
Kind: deploy.Kind,
Spec: WorkloadSpec{
Ref: objs[i],
NodeSelector: spec.Template.Spec.NodeSelector,
Tolerations: spec.Template.Spec.Tolerations,
},
Status: WorkloadStatus{
Replicas: deploy.Status.Replicas,
ReadyReplicas: deploy.Status.ReadyReplicas,
AvailableCondition: availableCondition,
},
}
workloads = append(workloads, w)
}
return workloads, nil
}
var _ WorkloadController = &DeploymentControllor{}
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloadcontroller
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/validation"
"github.com/openyurtio/openyurt/pkg/projectinfo"
)
func getWorkloadPrefix(controllerName, nodepoolName string) string {
prefix := fmt.Sprintf("%s-%s-", controllerName, nodepoolName)
if len(validation.NameIsDNSSubdomain(prefix, true)) != 0 {
prefix = fmt.Sprintf("%s-", controllerName)
}
return prefix
}
func CreateNodeSelectorByNodepoolName(nodepool string) map[string]string {
return map[string]string{
projectinfo.GetNodePoolLabel(): nodepool,
}
}
func TaintsToTolerations(taints []corev1.Taint) []corev1.Toleration {
tolerations := []corev1.Toleration{}
for _, taint := range taints {
toleation := corev1.Toleration{
Key: taint.Key,
Operator: corev1.TolerationOpExists,
Effect: taint.Effect,
}
tolerations = append(tolerations, toleation)
}
return tolerations
}
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloadcontroller
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
unitv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps"
)
type Workload struct {
Name string
Namespace string
Kind string
Spec WorkloadSpec
Status WorkloadStatus
}
// WorkloadSpec stores the spec details of the workload
type WorkloadSpec struct {
Ref metav1.Object
Tolerations []corev1.Toleration
NodeSelector map[string]string
}
// WorkloadStatus stores the observed state of the Workload.
type WorkloadStatus struct {
Replicas int32
ReadyReplicas int32
AvailableCondition corev1.ConditionStatus
}
func (w *Workload) GetRevision() string {
return w.Spec.Ref.GetLabels()[unitv1alpha1.ControllerRevisionHashLabelKey]
}
func (w *Workload) GetNodePoolName() string {
return w.Spec.Ref.GetAnnotations()[unitv1alpha1.AnnotationRefNodePool]
}
func (w *Workload) GetToleration() []corev1.Toleration {
return w.Spec.Tolerations
}
func (w *Workload) GetNodeSelector() map[string]string {
return w.Spec.NodeSelector
}
func (w *Workload) GetKind() string {
return w.Kind
}
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package yurtappdaemon
import (
"context"
"flag"
"fmt"
"reflect"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"github.com/openyurtio/openyurt/cmd/yurt-manager/app/config"
"github.com/openyurtio/openyurt/cmd/yurt-manager/names"
unitv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/util"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/yurtappdaemon/workloadcontroller"
)
var (
concurrentReconciles = 3
controllerResource = unitv1alpha1.SchemeGroupVersion.WithResource("yurtappdaemons")
)
const (
slowStartInitialBatchSize = 1
eventTypeRevisionProvision = "RevisionProvision"
eventTypeTemplateController = "TemplateController"
eventTypeWorkloadsCreated = "CreateWorkload"
eventTypeWorkloadsUpdated = "UpdateWorkload"
eventTypeWorkloadsDeleted = "DeleteWorkload"
)
func init() {
flag.IntVar(&concurrentReconciles, "yurtappdaemon-workers", concurrentReconciles, "Max concurrent workers for YurtAppDaemon controller.")
}
func Format(format string, args ...interface{}) string {
s := fmt.Sprintf(format, args...)
return fmt.Sprintf("%s: %s", names.YurtAppDaemonController, s)
}
// Add creates a new YurtAppDaemon Controller and adds it to the Manager with default RBAC.
// The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(ctx context.Context, c *config.CompletedConfig, mgr manager.Manager) error {
if _, err := mgr.GetRESTMapper().KindFor(controllerResource); err != nil {
klog.Infof("resource %s doesn't exist", controllerResource.String())
return err
}
klog.Infof("yurtappdaemon-controller add controller %s", controllerResource.String())
return add(mgr, newReconciler(mgr))
}
// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New(names.YurtAppDaemonController, mgr, controller.Options{Reconciler: r, MaxConcurrentReconciles: concurrentReconciles})
if err != nil {
return err
}
// Watch for changes to YurtAppDaemon
err = c.Watch(&source.Kind{Type: &unitv1alpha1.YurtAppDaemon{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}
// Watch for changes to NodePool
err = c.Watch(&source.Kind{Type: &unitv1alpha1.NodePool{}}, &EnqueueYurtAppDaemonForNodePool{client: mgr.GetClient()})
if err != nil {
return err
}
return nil
}
var _ reconcile.Reconciler = &ReconcileYurtAppDaemon{}
// ReconcileYurtAppDaemon reconciles a YurtAppDaemon object
type ReconcileYurtAppDaemon struct {
client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
controls map[unitv1alpha1.TemplateType]workloadcontroller.WorkloadController
}
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileYurtAppDaemon{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(names.YurtAppDaemonController),
controls: map[unitv1alpha1.TemplateType]workloadcontroller.WorkloadController{
// unitv1alpha1.StatefulSetTemplateType: &StatefulSetControllor{Client: mgr.GetClient(), scheme: mgr.GetScheme()},
unitv1alpha1.DeploymentTemplateType: &workloadcontroller.DeploymentControllor{Client: mgr.GetClient(), Scheme: mgr.GetScheme()},
},
}
}
// +kubebuilder:rbac:groups=apps.openyurt.io,resources=yurtappdaemons,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.openyurt.io,resources=yurtappdaemons/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete
// Reconcile reads that state of the cluster for a YurtAppDaemon object and makes changes based on the state read
// and what is in the YurtAppDaemon.Spec
func (r *ReconcileYurtAppDaemon) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
klog.V(4).Infof("Reconcile YurtAppDaemon %s/%s", request.Namespace, request.Name)
// Fetch the YurtAppDaemon instance
instance := &unitv1alpha1.YurtAppDaemon{}
err := r.Get(context.TODO(), request.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
if instance.DeletionTimestamp != nil {
return reconcile.Result{}, nil
}
oldStatus := instance.Status.DeepCopy()
currentRevision, updatedRevision, collisionCount, err := r.constructYurtAppDaemonRevisions(instance)
if err != nil {
klog.Errorf("could not construct controller revision of YurtAppDaemon %s/%s: %s", instance.Namespace, instance.Name, err)
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeRevisionProvision), err.Error())
return reconcile.Result{}, err
}
expectedRevision := currentRevision
if updatedRevision != nil {
expectedRevision = updatedRevision
}
klog.Infof("YurtAppDaemon [%s/%s] get expectRevision %v collisionCount %v", instance.GetNamespace(), instance.GetName(),
expectedRevision.Name, collisionCount)
control, templateType, err := r.getTemplateControls(instance)
if err != nil {
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeTemplateController), err.Error())
return reconcile.Result{}, err
}
if control == nil {
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("YurtAppDaemon[%s/%s] could not get control", instance.Namespace, instance.Name), fmt.Sprintf("could not find control"))
return reconcile.Result{}, fmt.Errorf("could not find control")
}
currentNPToWorkload, err := r.getNodePoolToWorkLoad(instance, control)
if err != nil {
klog.Errorf("YurtAppDaemon[%s/%s] could not get nodePoolWorkload, error: %s", instance.Namespace, instance.Name, err)
return reconcile.Result{}, err
}
allNameToNodePools, err := r.getNameToNodePools(instance)
if err != nil {
klog.Errorf("YurtAppDaemon[%s/%s] could not get nameToNodePools, error: %s", instance.Namespace, instance.Name, err)
return reconcile.Result{}, err
}
newStatus, err := r.manageWorkloads(instance, currentNPToWorkload, allNameToNodePools, expectedRevision.Name, templateType)
if err != nil {
return reconcile.Result{}, err
}
return r.updateStatus(instance, newStatus, oldStatus, expectedRevision, collisionCount, templateType, currentNPToWorkload)
}
func (r *ReconcileYurtAppDaemon) updateStatus(instance *unitv1alpha1.YurtAppDaemon, newStatus, oldStatus *unitv1alpha1.YurtAppDaemonStatus,
currentRevision *appsv1.ControllerRevision, collisionCount int32, templateType unitv1alpha1.TemplateType, currentNodepoolToWorkload map[string]*workloadcontroller.Workload) (reconcile.Result, error) {
newStatus = r.calculateStatus(instance, newStatus, currentRevision, collisionCount, templateType, currentNodepoolToWorkload)
_, err := r.updateYurtAppDaemon(instance, oldStatus, newStatus)
return reconcile.Result{}, err
}
func (r *ReconcileYurtAppDaemon) updateYurtAppDaemon(yad *unitv1alpha1.YurtAppDaemon, oldStatus, newStatus *unitv1alpha1.YurtAppDaemonStatus) (*unitv1alpha1.YurtAppDaemon, error) {
if oldStatus.CurrentRevision == newStatus.CurrentRevision &&
*oldStatus.CollisionCount == *newStatus.CollisionCount &&
oldStatus.TemplateType == newStatus.TemplateType &&
yad.Generation == newStatus.ObservedGeneration &&
reflect.DeepEqual(oldStatus.NodePools, newStatus.NodePools) &&
reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) {
klog.Infof("YurtAppDaemon[%s/%s] oldStatus==newStatus, no need to update status", yad.GetNamespace(), yad.GetName())
return yad, nil
}
newStatus.ObservedGeneration = yad.Generation
var getErr, updateErr error
for i, obj := 0, yad; ; i++ {
klog.V(4).Infof(fmt.Sprintf("YurtAppDaemon[%s/%s] The %d th time updating status for %v[%s/%s], ",
yad.GetNamespace(), yad.GetName(), i, obj.Kind, obj.Namespace, obj.Name) +
fmt.Sprintf("sequence No: %v->%v", obj.Status.ObservedGeneration, newStatus.ObservedGeneration))
obj.Status = *newStatus
updateErr = r.Client.Status().Update(context.TODO(), obj)
if updateErr == nil {
return obj, nil
}
if i >= updateRetries {
break
}
tmpObj := &unitv1alpha1.YurtAppDaemon{}
if getErr = r.Client.Get(context.TODO(), client.ObjectKey{Namespace: obj.Namespace, Name: obj.Name}, tmpObj); getErr != nil {
return nil, getErr
}
obj = tmpObj
}
klog.Errorf("could not update YurtAppDaemon %s/%s status: %s", yad.Namespace, yad.Name, updateErr)
return nil, updateErr
}
func (r *ReconcileYurtAppDaemon) calculateStatus(instance *unitv1alpha1.YurtAppDaemon, newStatus *unitv1alpha1.YurtAppDaemonStatus,
currentRevision *appsv1.ControllerRevision, collisionCount int32, templateType unitv1alpha1.TemplateType, currentNodepoolToWorkload map[string]*workloadcontroller.Workload) *unitv1alpha1.YurtAppDaemonStatus {
newStatus.CollisionCount = &collisionCount
var workloadFailure string
overriderList := unitv1alpha1.YurtAppOverriderList{}
if err := r.List(context.TODO(), &overriderList); err != nil {
workloadFailure = fmt.Sprintf("unable to list yurtappoverrider: %v", err)
}
for _, overrider := range overriderList.Items {
if overrider.Subject.Kind == "YurtAppDaemon" && overrider.Subject.Name == instance.Name {
newStatus.OverriderRef = overrider.Name
break
}
}
newStatus.WorkloadSummaries = make([]unitv1alpha1.WorkloadSummary, 0)
for _, workload := range currentNodepoolToWorkload {
newStatus.WorkloadSummaries = append(newStatus.WorkloadSummaries, unitv1alpha1.WorkloadSummary{
AvailableCondition: workload.Status.AvailableCondition,
Replicas: workload.Status.Replicas,
ReadyReplicas: workload.Status.ReadyReplicas,
WorkloadName: workload.Name,
})
}
if newStatus.CurrentRevision == "" {
// init with current revision
newStatus.CurrentRevision = currentRevision.Name
}
newStatus.TemplateType = templateType
if workloadFailure == "" {
RemoveYurtAppDaemonCondition(newStatus, unitv1alpha1.WorkLoadFailure)
} else {
SetYurtAppDaemonCondition(newStatus, NewYurtAppDaemonCondition(unitv1alpha1.WorkLoadFailure, corev1.ConditionFalse, "Error", workloadFailure))
}
return newStatus
}
func (r *ReconcileYurtAppDaemon) manageWorkloads(instance *unitv1alpha1.YurtAppDaemon, currentNodepoolToWorkload map[string]*workloadcontroller.Workload,
allNameToNodePools map[string]unitv1alpha1.NodePool, expectedRevision string, templateType unitv1alpha1.TemplateType) (newStatus *unitv1alpha1.YurtAppDaemonStatus, updateErr error) {
newStatus = instance.Status.DeepCopy()
nps := make([]string, 0, len(allNameToNodePools))
for np := range allNameToNodePools {
nps = append(nps, np)
}
newStatus.NodePools = nps
needDeleted, needUpdate, needCreate := r.classifyWorkloads(instance, currentNodepoolToWorkload, allNameToNodePools, expectedRevision)
provision, err := r.manageWorkloadsProvision(instance, allNameToNodePools, expectedRevision, templateType, needDeleted, needCreate)
if err != nil {
SetYurtAppDaemonCondition(newStatus, NewYurtAppDaemonCondition(unitv1alpha1.WorkLoadProvisioned, corev1.ConditionFalse, "Error", err.Error()))
return newStatus, fmt.Errorf("could not manage workload provision: %v", err)
}
if provision {
SetYurtAppDaemonCondition(newStatus, NewYurtAppDaemonCondition(unitv1alpha1.WorkLoadProvisioned, corev1.ConditionTrue, "", ""))
}
if len(needUpdate) > 0 {
_, updateErr = util.SlowStartBatch(len(needUpdate), slowStartInitialBatchSize, func(index int) error {
u := needUpdate[index]
updateWorkloadErr := r.controls[templateType].UpdateWorkload(u, instance, allNameToNodePools[u.GetNodePoolName()], expectedRevision)
if updateWorkloadErr != nil {
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s", eventTypeWorkloadsUpdated),
fmt.Sprintf("Error updating workload type(%s) %s when updating: %s", templateType, u.Name, updateWorkloadErr))
klog.Errorf("YurtAppDaemon[%s/%s] update workload[%s/%s/%s] error %v", instance.GetNamespace(), instance.GetName(),
templateType, u.Namespace, u.Name, err)
}
return updateWorkloadErr
})
}
if updateErr == nil {
SetYurtAppDaemonCondition(newStatus, NewYurtAppDaemonCondition(unitv1alpha1.WorkLoadUpdated, corev1.ConditionTrue, "", ""))
} else {
SetYurtAppDaemonCondition(newStatus, NewYurtAppDaemonCondition(unitv1alpha1.WorkLoadUpdated, corev1.ConditionFalse, "Error", updateErr.Error()))
}
return newStatus, updateErr
}
func (r *ReconcileYurtAppDaemon) manageWorkloadsProvision(instance *unitv1alpha1.YurtAppDaemon,
allNameToNodePools map[string]unitv1alpha1.NodePool, expectedRevision string, templateType unitv1alpha1.TemplateType,
needDeleted []*workloadcontroller.Workload, needCreate []string) (bool, error) {
// 针对于Create 的 需要创建
var errs []error
if len(needCreate) > 0 {
// do not consider deletion
var createdNum int
var createdErr error
createdNum, createdErr = util.SlowStartBatch(len(needCreate), slowStartInitialBatchSize, func(idx int) error {
nodepoolName := needCreate[idx]
err := r.controls[templateType].CreateWorkload(instance, allNameToNodePools[nodepoolName], expectedRevision)
//err := r.poolControls[workloadType].CreatePool(ud, poolName, revision, replicas)
if err != nil {
klog.Errorf("YurtAppDaemon[%s/%s] templatetype %s create workload by nodepool %s error: %s",
instance.GetNamespace(), instance.GetName(), templateType, nodepoolName, err.Error())
if !errors.IsTimeout(err) {
return fmt.Errorf("YurtAppDaemon[%s/%s] templatetype %s create workload by nodepool %s error: %s",
instance.GetNamespace(), instance.GetName(), templateType, nodepoolName, err.Error())
}
}
klog.Infof("YurtAppDaemon[%s/%s] templatetype %s create workload by nodepool %s success",
instance.GetNamespace(), instance.GetName(), templateType, nodepoolName)
return nil
})
if createdErr == nil {
r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeNormal, fmt.Sprintf("Successful %s", eventTypeWorkloadsCreated), "Create %d Workload type(%s)", createdNum, templateType)
} else {
errs = append(errs, createdErr)
}
}
// manage deleting
if len(needDeleted) > 0 {
var deleteErrs []error
// need deleted
for _, d := range needDeleted {
if err := r.controls[templateType].DeleteWorkload(instance, d); err != nil {
deleteErrs = append(deleteErrs, fmt.Errorf("YurtAppDaemon[%s/%s] delete workload[%s/%s/%s] error %v",
instance.GetNamespace(), instance.GetName(), templateType, d.Namespace, d.Name, err))
}
}
if len(deleteErrs) > 0 {
errs = append(errs, deleteErrs...)
} else {
r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeNormal, fmt.Sprintf("Successful %s", eventTypeWorkloadsDeleted), "Delete %d Workload type(%s)", len(needDeleted), templateType)
}
}
return len(needCreate) > 0 || len(needDeleted) > 0, utilerrors.NewAggregate(errs)
}
func (r *ReconcileYurtAppDaemon) classifyWorkloads(instance *unitv1alpha1.YurtAppDaemon, currentNodepoolToWorkload map[string]*workloadcontroller.Workload,
allNameToNodePools map[string]unitv1alpha1.NodePool, expectedRevision string) (needDeleted, needUpdate []*workloadcontroller.Workload, needCreate []string) {
for npName, load := range currentNodepoolToWorkload {
if np, ok := allNameToNodePools[npName]; ok {
match := true
// judge workload NodeSelector
if !reflect.DeepEqual(load.GetNodeSelector(), workloadcontroller.CreateNodeSelectorByNodepoolName(npName)) {
match = false
}
// judge workload whether toleration all taints
if !IsTolerationsAllTaints(load.GetToleration(), np.Spec.Taints) {
match = false
}
// judge revision
if load.GetRevision() != expectedRevision {
match = false
}
if !match {
klog.V(4).Infof("YurtAppDaemon[%s/%s] need update [%s/%s/%s]", instance.GetNamespace(),
instance.GetName(), load.GetKind(), load.Namespace, load.Name)
needUpdate = append(needUpdate, load)
}
} else {
needDeleted = append(needDeleted, load)
klog.V(4).Infof("YurtAppDaemon[%s/%s] need delete [%s/%s/%s]", instance.GetNamespace(),
instance.GetName(), load.GetKind(), load.Namespace, load.Name)
}
}
for vnp := range allNameToNodePools {
if _, ok := currentNodepoolToWorkload[vnp]; !ok {
needCreate = append(needCreate, vnp)
klog.V(4).Infof("YurtAppDaemon[%s/%s] need create new workload by nodepool %s", instance.GetNamespace(),
instance.GetName(), vnp)
}
}
return
}
func (r *ReconcileYurtAppDaemon) getNameToNodePools(instance *unitv1alpha1.YurtAppDaemon) (map[string]unitv1alpha1.NodePool, error) {
klog.V(4).Infof("YurtAppDaemon [%s/%s] prepare to get associated nodepools",
instance.Namespace, instance.Name)
nodepoolSelector, err := metav1.LabelSelectorAsSelector(instance.Spec.NodePoolSelector)
if err != nil {
return nil, err
}
nodepools := unitv1alpha1.NodePoolList{}
if err := r.Client.List(context.TODO(), &nodepools, &client.ListOptions{LabelSelector: nodepoolSelector}); err != nil {
klog.Errorf("YurtAppDaemon [%s/%s] could not get NodePoolList", instance.GetNamespace(),
instance.GetName())
return nil, nil
}
indexs := make(map[string]unitv1alpha1.NodePool)
for i, v := range nodepools.Items {
indexs[v.GetName()] = v
klog.V(4).Infof("YurtAppDaemon [%s/%s] get %d's associated nodepools %s",
instance.Namespace, instance.Name, i, v.Name)
}
return indexs, nil
}
func (r *ReconcileYurtAppDaemon) getTemplateControls(instance *unitv1alpha1.YurtAppDaemon) (workloadcontroller.WorkloadController,
unitv1alpha1.TemplateType, error) {
switch {
case instance.Spec.WorkloadTemplate.StatefulSetTemplate != nil:
return r.controls[unitv1alpha1.StatefulSetTemplateType], unitv1alpha1.StatefulSetTemplateType, nil
case instance.Spec.WorkloadTemplate.DeploymentTemplate != nil:
return r.controls[unitv1alpha1.DeploymentTemplateType], unitv1alpha1.DeploymentTemplateType, nil
default:
klog.Errorf("The appropriate WorkloadTemplate was not found")
return nil, "", fmt.Errorf("The appropriate WorkloadTemplate was not found, Now Support(%s/%s)",
unitv1alpha1.StatefulSetTemplateType, unitv1alpha1.DeploymentTemplateType)
}
}
func (r *ReconcileYurtAppDaemon) getNodePoolToWorkLoad(instance *unitv1alpha1.YurtAppDaemon, c workloadcontroller.WorkloadController) (map[string]*workloadcontroller.Workload, error) {
klog.V(4).Infof("YurtAppDaemon [%s/%s/%s] prepare to get all workload", c.GetTemplateType(), instance.Namespace, instance.Name)
nodePoolsToWorkloads := make(map[string]*workloadcontroller.Workload)
workloads, err := c.GetAllWorkloads(instance)
if err != nil {
klog.Errorf("Get all workloads for YurtAppDaemon[%s/%s] error %v", instance.GetNamespace(),
instance.GetName(), err)
return nil, err
}
// 获得workload 里对应的NodePool
for i, w := range workloads {
if w.GetNodePoolName() != "" {
nodePoolsToWorkloads[w.GetNodePoolName()] = workloads[i]
klog.V(4).Infof("YurtAppDaemon [%s/%s] get %d's workload[%s/%s/%s]",
instance.Namespace, instance.Name, i, c.GetTemplateType(), w.Namespace, w.Name)
} else {
klog.Warningf("YurtAppDaemon [%s/%s] %d's workload[%s/%s/%s] has no nodepool annotation",
instance.Namespace, instance.Name, i, c.GetTemplateType(), w.Namespace, w.Name)
}
}
klog.V(4).Infof("YurtAppDaemon [%s/%s] get %d %s workloads",
instance.Namespace, instance.Name, len(nodePoolsToWorkloads), c.GetTemplateType())
return nodePoolsToWorkloads, nil
}
//go:build gofuzz
// +build gofuzz
/*
Copyright 2022 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package yurtappdaemon
import (
"context"
fuzz "github.com/AdaLogics/go-fuzz-headers"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
appsv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/yurtappdaemon/workloadcontroller"
)
var (
fuzzCtx = context.Background()
fakeSchemeForFuzzing = runtime.NewScheme()
)
func init() {
_ = clientgoscheme.AddToScheme(fakeSchemeForFuzzing)
_ = appsv1alpha1.AddToScheme(fakeSchemeForFuzzing)
_ = corev1.AddToScheme(fakeSchemeForFuzzing)
}
func FuzzAppDaemonReconcile(data []byte) int {
f := fuzz.NewConsumer(data)
appDaemon := &appsv1alpha1.YurtAppDaemon{}
if err := f.GenerateStruct(appDaemon); err != nil {
return 0
}
clientFake := fake.NewClientBuilder().WithScheme(fakeSchemeForFuzzing).WithObjects(
appDaemon,
).Build()
r := &ReconcileYurtAppDaemon{
Client: clientFake,
scheme: fakeSchemeForFuzzing,
recorder: record.NewFakeRecorder(10000),
controls: map[appsv1alpha1.TemplateType]workloadcontroller.WorkloadController{
appsv1alpha1.DeploymentTemplateType: &workloadcontroller.DeploymentControllor{Client: clientFake, Scheme: fakeSchemeForFuzzing},
},
}
_, _ = r.Reconcile(fuzzCtx, reconcile.Request{NamespacedName: types.NamespacedName{Name: appDaemon.Name, Namespace: appDaemon.Namespace}})
return 1
}
/*
Copyright 2021 The OpenYurt Authors.
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adapter
import (
"encoding/json"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/klog/v2"
"github.com/openyurtio/openyurt/pkg/apis/apps"
appsv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
)
func getPoolPrefix(controllerName, poolName string) string {
prefix := fmt.Sprintf("%s-%s-", controllerName, poolName)
if len(validation.NameIsDNSSubdomain(prefix, true)) != 0 {
prefix = fmt.Sprintf("%s-", controllerName)
}
return prefix
}
func attachNodeAffinityAndTolerations(podSpec *corev1.PodSpec, pool *appsv1alpha1.Pool) {
attachNodeAffinity(podSpec, pool)
attachTolerations(podSpec, pool)
}
func attachNodeAffinity(podSpec *corev1.PodSpec, pool *appsv1alpha1.Pool) {
if podSpec.Affinity == nil {
podSpec.Affinity = &corev1.Affinity{}
}
if podSpec.Affinity.NodeAffinity == nil {
podSpec.Affinity.NodeAffinity = &corev1.NodeAffinity{}
}
if podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &corev1.NodeSelector{}
}
if podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms == nil {
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []corev1.NodeSelectorTerm{}
}
if len(podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 {
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = append(podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, corev1.NodeSelectorTerm{})
}
for _, matchExpression := range pool.NodeSelectorTerm.MatchExpressions {
for i, term := range podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms {
term.MatchExpressions = append(term.MatchExpressions, matchExpression)
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[i] = term
}
}
}
func attachTolerations(podSpec *corev1.PodSpec, poolConfig *appsv1alpha1.Pool) {
if poolConfig.Tolerations == nil {
return
}
if podSpec.Tolerations == nil {
podSpec.Tolerations = []corev1.Toleration{}
}
podSpec.Tolerations = append(podSpec.Tolerations, poolConfig.Tolerations...)
return
}
func getRevision(objMeta metav1.Object) string {
if objMeta.GetLabels() == nil {
return ""
}
return objMeta.GetLabels()[apps.ControllerRevisionHashLabelKey]
}
// getCurrentPartition calculates current partition by counting the pods not having the updated revision
func getCurrentPartition(pods []*corev1.Pod, revision string) *int32 {
var partition int32
for _, pod := range pods {
if getRevision(&pod.ObjectMeta) != revision {
partition++
}
}
return &partition
}
func StrategicMergeByPatches(oldobj interface{}, patch *runtime.RawExtension, newPatched interface{}) error {
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patch.Raw, &patchMap); err != nil {
klog.Errorf("Unmarshal pool patch error %v, patch Raw %v", err, string(patch.Raw))
return err
}
originalObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(oldobj)
if err != nil {
klog.Errorf("ToUnstructured error %v", err)
return err
}
patchedObjMap, err := strategicpatch.StrategicMergeMapPatch(originalObjMap, patchMap, newPatched)
if err != nil {
klog.Errorf("StartegicMergeMapPatch error %v", err)
return err
}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(patchedObjMap, newPatched); err != nil {
klog.Errorf("FromUnstructured error %v", err)
return err
}
return nil
}
func PoolHasPatch(poolConfig *appsv1alpha1.Pool, set metav1.Object) bool {
if poolConfig.Patch == nil {
// If No Patches, Must Set patches annotation to ""
if anno := set.GetAnnotations(); anno != nil {
anno[apps.AnnotationPatchKey] = ""
}
return false
}
return true
}
func CreateNewPatchedObject(patchInfo *runtime.RawExtension, set metav1.Object, newPatched metav1.Object) error {
if err := StrategicMergeByPatches(set, patchInfo, newPatched); err != nil {
return err
}
if anno := newPatched.GetAnnotations(); anno == nil {
newPatched.SetAnnotations(map[string]string{
apps.AnnotationPatchKey: string(patchInfo.Raw),
})
} else {
anno[apps.AnnotationPatchKey] = string(patchInfo.Raw)
}
return nil
}
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adapter
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"github.com/openyurtio/openyurt/pkg/apis/apps"
alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
)
type DeploymentAdapter struct {
client.Client
Scheme *runtime.Scheme
}
var _ Adapter = &DeploymentAdapter{}
// NewResourceObject creates a empty Deployment object.
func (a *DeploymentAdapter) NewResourceObject() runtime.Object {
return &appsv1.Deployment{}
}
// NewResourceListObject creates a empty DeploymentList object.
func (a *DeploymentAdapter) NewResourceListObject() runtime.Object {
return &appsv1.DeploymentList{}
}
// GetStatusObservedGeneration returns the observed generation of the pool.
func (a *DeploymentAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 {
return obj.(*appsv1.Deployment).Status.ObservedGeneration
}
// GetDetails returns the replicas detail the pool needs.
func (a *DeploymentAdapter) GetDetails(obj metav1.Object) (ReplicasInfo, error) {
set := obj.(*appsv1.Deployment)
var specReplicas int32
if set.Spec.Replicas != nil {
specReplicas = *set.Spec.Replicas
}
replicasInfo := ReplicasInfo{
Replicas: specReplicas,
ReadyReplicas: set.Status.ReadyReplicas,
}
return replicasInfo, nil
}
// GetAvailableStatus returns the available condition status of the workload
func (a *DeploymentAdapter) GetAvailableStatus(obj metav1.Object) (conditionStatus corev1.ConditionStatus, err error) {
dp := obj.(*appsv1.Deployment)
for _, condition := range dp.Status.Conditions {
if condition.Type == appsv1.DeploymentAvailable {
return condition.Status, nil
}
}
return corev1.ConditionUnknown, nil
}
// GetPoolFailure returns the failure information of the pool.
// Deployment has no condition.
func (a *DeploymentAdapter) GetPoolFailure() *string {
return nil
}
// ApplyPoolTemplate updates the pool to the latest revision, depending on the DeploymentTemplate.
func (a *DeploymentAdapter) ApplyPoolTemplate(yas *alpha1.YurtAppSet, poolName, revision string,
replicas int32, obj runtime.Object) error {
set := obj.(*appsv1.Deployment)
var poolConfig *alpha1.Pool
for i, pool := range yas.Spec.Topology.Pools {
if pool.Name == poolName {
poolConfig = &(yas.Spec.Topology.Pools[i])
break
}
}
if poolConfig == nil {
return fmt.Errorf("could not find pool config %s", poolName)
}
set.Namespace = yas.Namespace
if set.Labels == nil {
set.Labels = map[string]string{}
}
for k, v := range yas.Spec.WorkloadTemplate.DeploymentTemplate.Labels {
set.Labels[k] = v
}
for k, v := range yas.Spec.Selector.MatchLabels {
set.Labels[k] = v
}
set.Labels[apps.ControllerRevisionHashLabelKey] = revision
// record the pool name as a label
set.Labels[apps.PoolNameLabelKey] = poolName
if set.Annotations == nil {
set.Annotations = map[string]string{}
}
for k, v := range yas.Spec.WorkloadTemplate.DeploymentTemplate.Annotations {
set.Annotations[k] = v
}
set.GenerateName = getPoolPrefix(yas.Name, poolName)
selectors := yas.Spec.Selector.DeepCopy()
selectors.MatchLabels[apps.PoolNameLabelKey] = poolName
if err := controllerutil.SetControllerReference(yas, set, a.Scheme); err != nil {
return err
}
set.Spec.Selector = selectors
set.Spec.Replicas = &replicas
set.Spec.Strategy = *yas.Spec.WorkloadTemplate.DeploymentTemplate.Spec.Strategy.DeepCopy()
set.Spec.Template = *yas.Spec.WorkloadTemplate.DeploymentTemplate.Spec.Template.DeepCopy()
if set.Spec.Template.Labels == nil {
set.Spec.Template.Labels = map[string]string{}
}
set.Spec.Template.Labels[apps.PoolNameLabelKey] = poolName
set.Spec.Template.Labels[apps.ControllerRevisionHashLabelKey] = revision
set.Spec.RevisionHistoryLimit = yas.Spec.RevisionHistoryLimit
set.Spec.MinReadySeconds = yas.Spec.WorkloadTemplate.DeploymentTemplate.Spec.MinReadySeconds
set.Spec.Paused = yas.Spec.WorkloadTemplate.DeploymentTemplate.Spec.Paused
set.Spec.ProgressDeadlineSeconds = yas.Spec.WorkloadTemplate.DeploymentTemplate.Spec.ProgressDeadlineSeconds
attachNodeAffinityAndTolerations(&set.Spec.Template.Spec, poolConfig)
if !PoolHasPatch(poolConfig, set) {
klog.Infof("Deployment[%s/%s-] has no patches, do not need strategicmerge", set.Namespace,
set.GenerateName)
return nil
}
patched := &appsv1.Deployment{}
if err := CreateNewPatchedObject(poolConfig.Patch, set, patched); err != nil {
klog.Errorf("Deployment[%s/%s-] strategic merge by patch %s error %v", set.Namespace,
set.GenerateName, string(poolConfig.Patch.Raw), err)
return err
}
patched.DeepCopyInto(set)
klog.Infof("Deployment [%s/%s-] has patches configure successfully:%v", set.Namespace,
set.GenerateName, string(poolConfig.Patch.Raw))
return nil
}
// PostUpdate does some works after pool updated. Deployment will implement this method to clean stuck pods.
func (a *DeploymentAdapter) PostUpdate(yas *alpha1.YurtAppSet, obj runtime.Object, revision string) error {
// Do nothing,
return nil
}
// IsExpected checks the pool is the expected revision or not.
// The revision label can tell the current pool revision.
func (a *DeploymentAdapter) IsExpected(obj metav1.Object, revision string) bool {
return obj.GetLabels()[apps.ControllerRevisionHashLabelKey] != revision
}
/*
Copyright 2021 The OpenYurt Authors.
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@CHANGELOG
OpenYurt Authors:
change statefulset adapter
*/
package adapter
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"github.com/openyurtio/openyurt/pkg/apis/apps"
alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
)
type StatefulSetAdapter struct {
client.Client
Scheme *runtime.Scheme
}
var _ Adapter = &StatefulSetAdapter{}
// NewResourceObject creates a empty StatefulSet object.
func (a *StatefulSetAdapter) NewResourceObject() runtime.Object {
return &appsv1.StatefulSet{}
}
// NewResourceListObject creates a empty StatefulSetList object.
func (a *StatefulSetAdapter) NewResourceListObject() runtime.Object {
return &appsv1.StatefulSetList{}
}
// GetStatusObservedGeneration returns the observed generation of the pool.
func (a *StatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 {
return obj.(*appsv1.StatefulSet).Status.ObservedGeneration
}
// GetDetails returns the replicas detail the pool needs.
func (a *StatefulSetAdapter) GetDetails(obj metav1.Object) (ReplicasInfo, error) {
set := obj.(*appsv1.StatefulSet)
var specReplicas int32
if set.Spec.Replicas != nil {
specReplicas = *set.Spec.Replicas
}
replicasInfo := ReplicasInfo{
Replicas: specReplicas,
ReadyReplicas: set.Status.ReadyReplicas,
}
return replicasInfo, nil
}
// GetAvailableStatus returns the available condition status of the workload
func (a *StatefulSetAdapter) GetAvailableStatus(obj metav1.Object) (conditionStatus corev1.ConditionStatus, err error) {
set := obj.(*appsv1.StatefulSet)
if set.Status.AvailableReplicas != set.Status.Replicas {
return corev1.ConditionFalse, nil
}
return corev1.ConditionTrue, nil
}
// GetPoolFailure returns the failure information of the pool.
// StatefulSet has no condition.
func (a *StatefulSetAdapter) GetPoolFailure() *string {
return nil
}
// ApplyPoolTemplate updates the pool to the latest revision, depending on the StatefulSetTemplate.
func (a *StatefulSetAdapter) ApplyPoolTemplate(yas *alpha1.YurtAppSet, poolName, revision string,
replicas int32, obj runtime.Object) error {
set := obj.(*appsv1.StatefulSet)
var poolConfig *alpha1.Pool
for i, pool := range yas.Spec.Topology.Pools {
if pool.Name == poolName {
poolConfig = &(yas.Spec.Topology.Pools[i])
break
}
}
if poolConfig == nil {
return fmt.Errorf("could not find pool config %s", poolName)
}
set.Namespace = yas.Namespace
if set.Labels == nil {
set.Labels = map[string]string{}
}
for k, v := range yas.Spec.WorkloadTemplate.StatefulSetTemplate.Labels {
set.Labels[k] = v
}
for k, v := range yas.Spec.Selector.MatchLabels {
set.Labels[k] = v
}
set.Labels[apps.ControllerRevisionHashLabelKey] = revision
// record the pool name as a label
set.Labels[apps.PoolNameLabelKey] = poolName
if set.Annotations == nil {
set.Annotations = map[string]string{}
}
for k, v := range yas.Spec.WorkloadTemplate.StatefulSetTemplate.Annotations {
set.Annotations[k] = v
}
set.GenerateName = getPoolPrefix(yas.Name, poolName)
selectors := yas.Spec.Selector.DeepCopy()
selectors.MatchLabels[apps.PoolNameLabelKey] = poolName
if err := controllerutil.SetControllerReference(yas, set, a.Scheme); err != nil {
return err
}
set.Spec.Selector = selectors
set.Spec.Replicas = &replicas
set.Spec.UpdateStrategy = *yas.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.UpdateStrategy.DeepCopy()
set.Spec.Template = *yas.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.Template.DeepCopy()
if set.Spec.Template.Labels == nil {
set.Spec.Template.Labels = map[string]string{}
}
set.Spec.Template.Labels[apps.PoolNameLabelKey] = poolName
set.Spec.Template.Labels[apps.ControllerRevisionHashLabelKey] = revision
set.Spec.RevisionHistoryLimit = yas.Spec.RevisionHistoryLimit
set.Spec.PodManagementPolicy = yas.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.PodManagementPolicy
set.Spec.ServiceName = yas.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.ServiceName
set.Spec.VolumeClaimTemplates = yas.Spec.WorkloadTemplate.StatefulSetTemplate.Spec.VolumeClaimTemplates
attachNodeAffinityAndTolerations(&set.Spec.Template.Spec, poolConfig)
if !PoolHasPatch(poolConfig, set) {
klog.Infof("StatefulSet[%s/%s-] has no patches, do not need strategicmerge", set.Namespace,
set.GenerateName)
return nil
}
patched := &appsv1.StatefulSet{}
if err := CreateNewPatchedObject(poolConfig.Patch, set, patched); err != nil {
klog.Errorf("StatefulSet[%s/%s-] strategic merge by patch %s error %v", set.Namespace,
set.GenerateName, string(poolConfig.Patch.Raw), err)
return err
}
patched.DeepCopyInto(set)
klog.Infof("Statefulset [%s/%s-] has patches configure successfully:%v", set.Namespace,
set.GenerateName, string(poolConfig.Patch.Raw))
return nil
}
// PostUpdate does some works after pool updated. StatefulSet will implement this method to clean stuck pods.
func (a *StatefulSetAdapter) PostUpdate(yas *alpha1.YurtAppSet, obj runtime.Object, revision string) error {
/*
if strategy == nil {
return nil
}
set := obj.(*appsv1.StatefulSet)
if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType {
return nil
}
// If RollingUpdate, work around for issue https://github.com/kubernetes/kubernetes/issues/67250
return a.deleteStuckPods(set, revision, strategy.GetPartition())
*/
return nil
}
// IsExpected checks the pool is the expected revision or not.
// The revision label can tell the current pool revision.
func (a *StatefulSetAdapter) IsExpected(obj metav1.Object, revision string) bool {
return obj.GetLabels()[apps.ControllerRevisionHashLabelKey] != revision
}
/*
func (a *StatefulSetAdapter) getStatefulSetPods(set *appsv1.StatefulSet) ([]*corev1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
podList := &corev1.PodList{}
err = a.Client.List(context.TODO(), podList, &client.ListOptions{LabelSelector: selector})
if err != nil {
return nil, err
}
manager, err := refmanager.New(a.Client, set.Spec.Selector, set, a.Scheme)
if err != nil {
return nil, err
}
selected := make([]metav1.Object, len(podList.Items))
for i, pod := range podList.Items {
selected[i] = pod.DeepCopy()
}
claimed, err := manager.ClaimOwnedObjects(selected)
if err != nil {
return nil, err
}
claimedPods := make([]*corev1.Pod, len(claimed))
for i, pod := range claimed {
claimedPods[i] = pod.(*corev1.Pod)
}
return claimedPods, nil
}
// deleteStucckPods tries to work around the blocking issue https://github.com/kubernetes/kubernetes/issues/67250
func (a *StatefulSetAdapter) deleteStuckPods(set *appsv1.StatefulSet, revision string, partition int32) error {
pods, err := a.getStatefulSetPods(set)
if err != nil {
return err
}
for i := range pods {
pod := pods[i]
// If the pod is considered as stuck, delete it.
if isPodStuckForRollingUpdate(pod, revision, partition) {
klog.V(2).Infof("Delete pod %s/%s at stuck state", pod.Namespace, pod.Name)
err = a.Delete(context.TODO(), pod, client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
return err
}
}
}
return nil
}
// isPodStuckForRollingUpdate checks whether the pod is stuck under strategy RollingUpdate.
// If a pod needs to upgrade (pod_ordinal >= partition && pod_revision != sts_revision)
// and its readiness is false, or worse status like Pending, ImagePullBackOff, it will be blocked.
func isPodStuckForRollingUpdate(pod *corev1.Pod, revision string, partition int32) bool {
if yurtctlutil.GetOrdinal(pod) < partition {
return false
}
if getRevision(pod) == revision {
return false
}
return !podutil.IsPodReadyConditionTrue(pod.Status)
}
*/
/*
Copyright 2021 The OpenYurt Authors.
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package yurtappset
import (
"context"
"errors"
"reflect"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/openyurtio/openyurt/pkg/apis/apps"
alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/util/refmanager"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/yurtappset/adapter"
)
// PoolControl provides pool operations of MutableSet.
type PoolControl struct {
client.Client
scheme *runtime.Scheme
adapter adapter.Adapter
}
// GetAllPools returns all of pools owned by the YurtAppSet.
func (m *PoolControl) GetAllPools(yas *alpha1.YurtAppSet) (pools []*Pool, err error) {
selector, err := metav1.LabelSelectorAsSelector(yas.Spec.Selector)
if err != nil {
return nil, err
}
setList := m.adapter.NewResourceListObject()
cliSetList, ok := setList.(client.ObjectList)
if !ok {
return nil, errors.New("could not convert runtime object to client.ObjectList")
}
err = m.Client.List(context.TODO(), cliSetList, &client.ListOptions{LabelSelector: selector})
if err != nil {
return nil, err
}
manager, err := refmanager.New(m.Client, yas.Spec.Selector, yas, m.scheme)
if err != nil {
return nil, err
}
v := reflect.ValueOf(setList).Elem().FieldByName("Items")
selected := make([]metav1.Object, v.Len())
for i := 0; i < v.Len(); i++ {
selected[i] = v.Index(i).Addr().Interface().(metav1.Object)
}
claimedSets, err := manager.ClaimOwnedObjects(selected)
if err != nil {
return nil, err
}
for _, claimedSet := range claimedSets {
pool, err := m.convertToPool(claimedSet)
if err != nil {
return nil, err
}
pools = append(pools, pool)
}
return pools, nil
}
// CreatePool creates the Pool depending on the inputs.
func (m *PoolControl) CreatePool(yas *alpha1.YurtAppSet, poolName string, revision string,
replicas int32) error {
set := m.adapter.NewResourceObject()
m.adapter.ApplyPoolTemplate(yas, poolName, revision, replicas, set)
klog.V(4).Infof("Have %d replicas when creating Pool for YurtAppSet %s/%s", replicas, yas.Namespace, yas.Name)
cliSet, ok := set.(client.Object)
if !ok {
return errors.New("could not convert runtime.Object to client.Object")
}
return m.Create(context.TODO(), cliSet)
}
// UpdatePool is used to update the pool. The target Pool workload can be found with the input pool.
func (m *PoolControl) UpdatePool(pool *Pool, yas *alpha1.YurtAppSet, revision string, replicas int32) error {
set := m.adapter.NewResourceObject()
cliSet, ok := set.(client.Object)
if !ok {
return errors.New("could not convert runtime.Object to client.Object")
}
var updateError error
for i := 0; i < updateRetries; i++ {
getError := m.Client.Get(context.TODO(), m.objectKey(pool), cliSet)
if getError != nil {
return getError
}
if err := m.adapter.ApplyPoolTemplate(yas, pool.Name, revision, replicas, set); err != nil {
return err
}
updateError = m.Client.Update(context.TODO(), cliSet)
if updateError == nil {
break
}
}
if updateError != nil {
return updateError
}
return m.adapter.PostUpdate(yas, set, revision)
}
// DeletePool is called to delete the pool. The target Pool workload can be found with the input pool.
func (m *PoolControl) DeletePool(pool *Pool) error {
set := pool.Spec.PoolRef.(runtime.Object)
cliSet, ok := set.(client.Object)
if !ok {
return errors.New("could not convert runtime.Object to client.Object")
}
return m.Delete(context.TODO(), cliSet, client.PropagationPolicy(metav1.DeletePropagationBackground))
}
// GetPoolFailure return the error message extracted form Pool workload status conditions.
func (m *PoolControl) GetPoolFailure(pool *Pool) *string {
return m.adapter.GetPoolFailure()
}
// IsExpected checks the pool is expected revision or not.
func (m *PoolControl) IsExpected(pool *Pool, revision string) bool {
return m.adapter.IsExpected(pool.Spec.PoolRef, revision)
}
func (m *PoolControl) convertToPool(set metav1.Object) (*Pool, error) {
poolName, err := getPoolNameFrom(set)
if err != nil {
return nil, err
}
specReplicas, err := m.adapter.GetDetails(set)
if err != nil {
return nil, err
}
conditionStatus, err := m.adapter.GetAvailableStatus(set)
if err != nil {
return nil, err
}
pool := &Pool{
Name: poolName,
Namespace: set.GetNamespace(),
Spec: PoolSpec{
PoolRef: set,
},
Status: PoolStatus{
ObservedGeneration: m.adapter.GetStatusObservedGeneration(set),
ReplicasInfo: specReplicas,
AvailableCondition: conditionStatus,
},
}
if data, ok := set.GetAnnotations()[apps.AnnotationPatchKey]; ok {
pool.Status.PatchInfo = data
}
return pool, nil
}
func (m *PoolControl) objectKey(pool *Pool) client.ObjectKey {
return types.NamespacedName{
Namespace: pool.Namespace,
Name: pool.Spec.PoolRef.GetName(),
}
}
/*
Copyright 2020 The OpenYurt Authors.
Copyright 2019 The Kruise Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package yurtappset
import (
"bytes"
"context"
"encoding/json"
"fmt"
apps "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/history"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
appsalphav1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/util/refmanager"
)
// ControllerRevisionHashLabel is the label used to indicate the hash value of a ControllerRevision's Data.
const ControllerRevisionHashLabel = "controller.kubernetes.io/hash"
func (r *ReconcileYurtAppSet) controlledHistories(yas *appsalphav1.YurtAppSet) ([]*apps.ControllerRevision, error) {
// List all histories to include those that don't match the selector anymore
// but have a ControllerRef pointing to the controller.
selector, err := metav1.LabelSelectorAsSelector(yas.Spec.Selector)
if err != nil {
return nil, err
}
histories := &apps.ControllerRevisionList{}
err = r.Client.List(context.TODO(), histories, &client.ListOptions{LabelSelector: selector})
if err != nil {
return nil, err
}
klog.V(1).Infof("List controller revision of YurtAppSet %s/%s: count %d\n", yas.Namespace, yas.Name, len(histories.Items))
// Use ControllerRefManager to adopt/orphan as needed.
cm, err := refmanager.New(r.Client, yas.Spec.Selector, yas, r.scheme)
if err != nil {
return nil, err
}
mts := make([]metav1.Object, len(histories.Items))
for i, history := range histories.Items {
mts[i] = history.DeepCopy()
}
claims, err := cm.ClaimOwnedObjects(mts)
if err != nil {
return nil, err
}
claimHistories := make([]*apps.ControllerRevision, len(claims))
for i, mt := range claims {
claimHistories[i] = mt.(*apps.ControllerRevision)
}
return claimHistories, nil
}
func (r *ReconcileYurtAppSet) constructYurtAppSetRevisions(yas *appsalphav1.YurtAppSet) (*apps.ControllerRevision, *apps.ControllerRevision, int32, error) {
var currentRevision, updateRevision *apps.ControllerRevision
revisions, err := r.controlledHistories(yas)
if err != nil {
if yas.Status.CollisionCount == nil {
return currentRevision, updateRevision, 0, err
}
return currentRevision, updateRevision, *yas.Status.CollisionCount, err
}
history.SortControllerRevisions(revisions)
cleanedRevision, err := r.cleanExpiredRevision(yas, &revisions)
if err != nil {
if yas.Status.CollisionCount == nil {
return currentRevision, updateRevision, 0, err
}
return currentRevision, updateRevision, *yas.Status.CollisionCount, err
}
revisions = *cleanedRevision
// Use a local copy of set.Status.CollisionCount to avoid modifying set.Status directly.
// This copy is returned so the value gets carried over to set.Status in updateStatefulSet.
var collisionCount int32
if yas.Status.CollisionCount != nil {
collisionCount = *yas.Status.CollisionCount
}
// create a new revision from the current set
updateRevision, err = r.newRevision(yas, nextRevision(revisions), &collisionCount)
if err != nil {
return nil, nil, collisionCount, err
}
// find any equivalent revisions
equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
equalCount := len(equalRevisions)
revisionCount := len(revisions)
if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
// if the equivalent revision is immediately prior the update revision has not changed
updateRevision = revisions[revisionCount-1]
} else if equalCount > 0 {
// if the equivalent revision is not immediately prior we will roll back by incrementing the
// Revision of the equivalent revision
equalRevisions[equalCount-1].Revision = updateRevision.Revision
err := r.Client.Update(context.TODO(), equalRevisions[equalCount-1])
if err != nil {
return nil, nil, collisionCount, err
}
updateRevision = equalRevisions[equalCount-1]
} else {
//if there is no equivalent revision we create a new one
updateRevision, err = r.createControllerRevision(yas, updateRevision, &collisionCount)
if err != nil {
return nil, nil, collisionCount, err
}
}
// attempt to find the revision that corresponds to the current revision
for i := range revisions {
if revisions[i].Name == yas.Status.CurrentRevision {
currentRevision = revisions[i]
}
}
// if the current revision is nil we initialize the history by setting it to the update revision
if currentRevision == nil {
currentRevision = updateRevision
}
return currentRevision, updateRevision, collisionCount, nil
}
func (r *ReconcileYurtAppSet) cleanExpiredRevision(yas *appsalphav1.YurtAppSet,
sortedRevisions *[]*apps.ControllerRevision) (*[]*apps.ControllerRevision, error) {
exceedNum := len(*sortedRevisions) - int(*yas.Spec.RevisionHistoryLimit)
if exceedNum <= 0 {
return sortedRevisions, nil
}
live := map[string]bool{}
live[yas.Status.CurrentRevision] = true
for i, revision := range *sortedRevisions {
if _, exist := live[revision.Name]; exist {
continue
}
if i >= exceedNum {
break
}
if err := r.Client.Delete(context.TODO(), revision); err != nil {
return sortedRevisions, err
}
}
cleanedRevisions := (*sortedRevisions)[exceedNum:]
return &cleanedRevisions, nil
}
// createControllerRevision creates the controller revision owned by the parent.
func (r *ReconcileYurtAppSet) createControllerRevision(parent metav1.Object, revision *apps.ControllerRevision, collisionCount *int32) (*apps.ControllerRevision, error) {
if collisionCount == nil {
return nil, fmt.Errorf("collisionCount should not be nil")
}
// Clone the input
clone := revision.DeepCopy()
var err error
// Continue to attempt to create the revision updating the name with a new hash on each iteration
for {
hash := history.HashControllerRevision(revision, collisionCount)
// Update the revisions name
clone.Name = history.ControllerRevisionName(parent.GetName(), hash)
err = r.Client.Create(context.TODO(), clone)
if errors.IsAlreadyExists(err) {
exists := &apps.ControllerRevision{}
err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: parent.GetNamespace(), Name: clone.Name}, exists)
if err != nil {
return nil, err
}
if bytes.Equal(exists.Data.Raw, clone.Data.Raw) {
return exists, nil
}
*collisionCount++
continue
}
return clone, err
}
}
// newRevision creates a new ControllerRevision containing a patch that reapplies the target state of set.
// The Revision of the returned ControllerRevision is set to revision. If the returned error is nil, the returned
// ControllerRevision is valid. StatefulSet revisions are stored as patches that re-apply the current state of set
// to a new StatefulSet using a strategic merge patch to replace the saved state of the new StatefulSet.
func (r *ReconcileYurtAppSet) newRevision(yas *appsalphav1.YurtAppSet, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) {
patch, err := getYurtAppSetPatch(yas)
if err != nil {
return nil, err
}
gvk, err := apiutil.GVKForObject(yas, r.scheme)
if err != nil {
return nil, err
}
var selectedLabels map[string]string
switch {
case yas.Spec.WorkloadTemplate.StatefulSetTemplate != nil:
selectedLabels = yas.Spec.WorkloadTemplate.StatefulSetTemplate.Labels
case yas.Spec.WorkloadTemplate.DeploymentTemplate != nil:
selectedLabels = yas.Spec.WorkloadTemplate.DeploymentTemplate.Labels
default:
klog.Errorf("YurtAppSet(%s/%s) need specific WorkloadTemplate", yas.GetNamespace(), yas.GetName())
return nil, fmt.Errorf("YurtAppSet(%s/%s) need specific WorkloadTemplate", yas.GetNamespace(), yas.GetName())
}
cr, err := history.NewControllerRevision(yas,
gvk,
selectedLabels,
runtime.RawExtension{Raw: patch},
revision,
collisionCount)
if err != nil {
return nil, err
}
cr.Namespace = yas.Namespace
return cr, nil
}
// nextRevision finds the next valid revision number based on revisions. If the length of revisions
// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method
// assumes that revisions has been sorted by Revision.
func nextRevision(revisions []*apps.ControllerRevision) int64 {
count := len(revisions)
if count <= 0 {
return 1
}
return revisions[count-1].Revision + 1
}
func getYurtAppSetPatch(yas *appsalphav1.YurtAppSet) ([]byte, error) {
dsBytes, err := json.Marshal(yas)
if err != nil {
return nil, err
}
var raw map[string]interface{}
err = json.Unmarshal(dsBytes, &raw)
if err != nil {
return nil, err
}
objCopy := make(map[string]interface{})
specCopy := make(map[string]interface{})
// Create a patch of the YurtAppSet that replaces spec.template
spec := raw["spec"].(map[string]interface{})
template := spec["workloadTemplate"].(map[string]interface{})
specCopy["workloadTemplate"] = template
template["$patch"] = "replace"
objCopy["spec"] = specCopy
patch, err := json.Marshal(objCopy)
return patch, err
}
/*
Copyright 2021 The OpenYurt Authors.
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@CHANGELOG
OpenYurt Authors:
change uniteddeployment reconcile
*/
package yurtappset
import (
"context"
"flag"
"fmt"
"reflect"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"github.com/openyurtio/openyurt/cmd/yurt-manager/app/config"
"github.com/openyurtio/openyurt/cmd/yurt-manager/names"
unitv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/yurtappset/adapter"
)
func init() {
flag.IntVar(&concurrentReconciles, "yurtappset-workers", concurrentReconciles, "Max concurrent workers for YurtAppSet controller.")
}
var (
concurrentReconciles = 3
controllerResource = unitv1alpha1.SchemeGroupVersion.WithResource("yurtappsets")
)
const (
eventTypeRevisionProvision = "RevisionProvision"
eventTypeFindPools = "FindPools"
eventTypeDupPoolsDelete = "DeleteDuplicatedPools"
eventTypePoolsUpdate = "UpdatePool"
eventTypeTemplateController = "TemplateController"
slowStartInitialBatchSize = 1
)
func Format(format string, args ...interface{}) string {
s := fmt.Sprintf(format, args...)
return fmt.Sprintf("%s: %s", names.YurtAppSetController, s)
}
// Add creates a new YurtAppSet Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(ctx context.Context, c *config.CompletedConfig, mgr manager.Manager) error {
if _, err := mgr.GetRESTMapper().KindFor(controllerResource); err != nil {
klog.Infof("resource %s doesn't exist", controllerResource.String())
return err
}
klog.Infof("yurtappset-controller add controller %s", controllerResource.String())
return add(mgr, newReconciler(c, mgr))
}
// newReconciler returns a new reconcile.Reconciler
func newReconciler(c *config.CompletedConfig, mgr manager.Manager) reconcile.Reconciler {
return &ReconcileYurtAppSet{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(names.YurtAppSetController),
poolControls: map[unitv1alpha1.TemplateType]ControlInterface{
unitv1alpha1.StatefulSetTemplateType: &PoolControl{Client: mgr.GetClient(), scheme: mgr.GetScheme(),
adapter: &adapter.StatefulSetAdapter{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}},
unitv1alpha1.DeploymentTemplateType: &PoolControl{Client: mgr.GetClient(), scheme: mgr.GetScheme(),
adapter: &adapter.DeploymentAdapter{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}},
},
}
}
// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New(names.YurtAppSetController, mgr, controller.Options{Reconciler: r, MaxConcurrentReconciles: concurrentReconciles})
if err != nil {
return err
}
// Watch for changes to YurtAppSet
err = c.Watch(&source.Kind{Type: &unitv1alpha1.YurtAppSet{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(&source.Kind{Type: &appsv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &unitv1alpha1.YurtAppSet{},
})
if err != nil {
return err
}
err = c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &unitv1alpha1.YurtAppSet{},
})
if err != nil {
return err
}
return nil
}
var _ reconcile.Reconciler = &ReconcileYurtAppSet{}
// ReconcileYurtAppSet reconciles a YurtAppSet object
type ReconcileYurtAppSet struct {
client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
poolControls map[unitv1alpha1.TemplateType]ControlInterface
}
// +kubebuilder:rbac:groups=apps.openyurt.io,resources=yurtappsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.openyurt.io,resources=yurtappsets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete
// Reconcile reads that state of the cluster for a YurtAppSet object and makes changes based on the state read
// and what is in the YurtAppSet.Spec
func (r *ReconcileYurtAppSet) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
klog.V(4).Infof("Reconcile YurtAppSet %s/%s", request.Namespace, request.Name)
// Fetch the YurtAppSet instance
instance := &unitv1alpha1.YurtAppSet{}
err := r.Get(context.TODO(), request.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
if instance.DeletionTimestamp != nil {
return reconcile.Result{}, nil
}
oldStatus := instance.Status.DeepCopy()
currentRevision, updatedRevision, collisionCount, err := r.constructYurtAppSetRevisions(instance)
if err != nil {
klog.Errorf("could not construct controller revision of YurtAppSet %s/%s: %s", instance.Namespace, instance.Name, err)
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeRevisionProvision), err.Error())
return reconcile.Result{}, err
}
control, poolType, err := r.getPoolControls(instance)
if err != nil {
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeTemplateController), err.Error())
return reconcile.Result{}, err
}
klog.V(4).Infof("Get YurtAppSet %s/%s all pools", request.Namespace, request.Name)
nameToPool, err := r.getNameToPool(instance, control)
if err != nil {
klog.Errorf("could not get Pools of YurtAppSet %s/%s: %s", instance.Namespace, instance.Name, err)
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s",
eventTypeFindPools), err.Error())
return reconcile.Result{}, nil
}
nextPatches := GetNextPatches(instance)
klog.V(4).Infof("Get YurtAppSet %s/%s next Patches %v", instance.Namespace, instance.Name, nextPatches)
expectedRevision := currentRevision
if updatedRevision != nil {
expectedRevision = updatedRevision
}
newStatus, err := r.managePools(instance, nameToPool, nextPatches, expectedRevision, poolType)
if err != nil {
klog.Errorf("could not update YurtAppSet %s/%s: %s", instance.Namespace, instance.Name, err)
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypePoolsUpdate), err.Error())
}
return r.updateStatus(instance, newStatus, oldStatus, nameToPool, expectedRevision, collisionCount, control)
}
func (r *ReconcileYurtAppSet) getNameToPool(instance *unitv1alpha1.YurtAppSet, control ControlInterface) (map[string]*Pool, error) {
pools, err := control.GetAllPools(instance)
if err != nil {
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeFindPools), err.Error())
return nil, fmt.Errorf("could not get all Pools for YurtAppSet %s/%s: %s", instance.Namespace, instance.Name, err)
}
klog.V(4).Infof("Classify YurtAppSet %s/%s by pool name", instance.Namespace, instance.Name)
nameToPools := r.classifyPoolByPoolName(pools)
nameToPool, err := r.deleteDupPool(nameToPools, control)
if err != nil {
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeDupPoolsDelete), err.Error())
return nil, fmt.Errorf("could not manage duplicate Pool of YurtAppSet %s/%s: %s", instance.Namespace, instance.Name, err)
}
return nameToPool, nil
}
func (r *ReconcileYurtAppSet) deleteDupPool(nameToPools map[string][]*Pool, control ControlInterface) (map[string]*Pool, error) {
nameToPool := map[string]*Pool{}
for name, pools := range nameToPools {
if len(pools) > 1 {
for _, pool := range pools[1:] {
klog.V(0).Infof("Delete duplicated Pool %s/%s for pool name %s", pool.Namespace, pool.Name, name)
if err := control.DeletePool(pool); err != nil {
if errors.IsNotFound(err) {
continue
}
return nameToPool, err
}
}
}
if len(pools) > 0 {
nameToPool[name] = pools[0]
}
}
return nameToPool, nil
}
func (r *ReconcileYurtAppSet) getPoolControls(instance *unitv1alpha1.YurtAppSet) (ControlInterface,
unitv1alpha1.TemplateType, error) {
switch {
case instance.Spec.WorkloadTemplate.StatefulSetTemplate != nil:
return r.poolControls[unitv1alpha1.StatefulSetTemplateType], unitv1alpha1.StatefulSetTemplateType, nil
case instance.Spec.WorkloadTemplate.DeploymentTemplate != nil:
return r.poolControls[unitv1alpha1.DeploymentTemplateType], unitv1alpha1.DeploymentTemplateType, nil
default:
klog.Errorf("The appropriate WorkloadTemplate was not found")
return nil, "", fmt.Errorf("The appropriate WorkloadTemplate was not found, Now Support(%s/%s)",
unitv1alpha1.StatefulSetTemplateType, unitv1alpha1.DeploymentTemplateType)
}
}
func (r *ReconcileYurtAppSet) classifyPoolByPoolName(pools []*Pool) map[string][]*Pool {
mapping := map[string][]*Pool{}
for _, ss := range pools {
poolName := ss.Name
mapping[poolName] = append(mapping[poolName], ss)
}
return mapping
}
func (r *ReconcileYurtAppSet) updateStatus(instance *unitv1alpha1.YurtAppSet, newStatus, oldStatus *unitv1alpha1.YurtAppSetStatus,
nameToPool map[string]*Pool, currentRevision *appsv1.ControllerRevision,
collisionCount int32, control ControlInterface) (reconcile.Result, error) {
newStatus = r.calculateStatus(instance, newStatus, nameToPool, currentRevision, collisionCount, control)
_, err := r.updateYurtAppSet(instance, oldStatus, newStatus)
return reconcile.Result{}, err
}
func (r *ReconcileYurtAppSet) calculateStatus(instance *unitv1alpha1.YurtAppSet, newStatus *unitv1alpha1.YurtAppSetStatus,
nameToPool map[string]*Pool, currentRevision *appsv1.ControllerRevision,
collisionCount int32, control ControlInterface) *unitv1alpha1.YurtAppSetStatus {
newStatus.CollisionCount = &collisionCount
if newStatus.CurrentRevision == "" {
// init with current revision
newStatus.CurrentRevision = currentRevision.Name
}
// update OverriderRef
var poolFailure *string
overriderList := unitv1alpha1.YurtAppOverriderList{}
if err := r.List(context.TODO(), &overriderList); err != nil {
message := fmt.Sprintf("could not list yurtappoverrider: %v", err)
poolFailure = &message
}
for _, overrider := range overriderList.Items {
if overrider.Subject.Kind == "YurtAppSet" && overrider.Subject.Name == instance.Name {
newStatus.OverriderRef = overrider.Name
break
}
}
// sync from status
newStatus.WorkloadSummaries = make([]unitv1alpha1.WorkloadSummary, 0)
newStatus.PoolReplicas = make(map[string]int32)
newStatus.ReadyReplicas = 0
newStatus.Replicas = 0
for _, pool := range nameToPool {
newStatus.PoolReplicas[pool.Name] = pool.Status.Replicas
newStatus.WorkloadSummaries = append(newStatus.WorkloadSummaries, unitv1alpha1.WorkloadSummary{
AvailableCondition: pool.Status.AvailableCondition,
Replicas: pool.Status.Replicas,
ReadyReplicas: pool.Status.ReadyReplicas,
WorkloadName: pool.Spec.PoolRef.GetName(),
})
newStatus.Replicas += pool.Status.Replicas
newStatus.ReadyReplicas += pool.Status.ReadyReplicas
}
newStatus.TemplateType = getPoolTemplateType(instance)
for _, pool := range nameToPool {
failureMessage := control.GetPoolFailure(pool)
if failureMessage != nil {
poolFailure = failureMessage
break
}
}
if poolFailure == nil {
RemoveYurtAppSetCondition(newStatus, unitv1alpha1.PoolFailure)
} else {
SetYurtAppSetCondition(newStatus, NewYurtAppSetCondition(unitv1alpha1.PoolFailure, corev1.ConditionTrue, "Error", *poolFailure))
}
return newStatus
}
func getPoolTemplateType(obj *unitv1alpha1.YurtAppSet) (templateType unitv1alpha1.TemplateType) {
template := obj.Spec.WorkloadTemplate
switch {
case template.StatefulSetTemplate != nil:
templateType = unitv1alpha1.StatefulSetTemplateType
case template.DeploymentTemplate != nil:
templateType = unitv1alpha1.DeploymentTemplateType
default:
klog.Warning("YurtAppSet.Spec.WorkloadTemplate exist wrong template")
}
return
}
func (r *ReconcileYurtAppSet) updateYurtAppSet(yas *unitv1alpha1.YurtAppSet, oldStatus, newStatus *unitv1alpha1.YurtAppSetStatus) (*unitv1alpha1.YurtAppSet, error) {
if oldStatus.CurrentRevision == newStatus.CurrentRevision &&
oldStatus.CollisionCount == newStatus.CollisionCount &&
oldStatus.Replicas == newStatus.Replicas &&
oldStatus.ReadyReplicas == newStatus.ReadyReplicas &&
yas.Generation == newStatus.ObservedGeneration &&
reflect.DeepEqual(oldStatus.WorkloadSummaries, newStatus.WorkloadSummaries) &&
reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) {
return yas, nil
}
newStatus.ObservedGeneration = yas.Generation
var getErr, updateErr error
for i, obj := 0, yas; ; i++ {
klog.V(4).Infof(fmt.Sprintf("The %d th time updating status for %v: %s/%s, ", i, obj.Kind, obj.Namespace, obj.Name) +
fmt.Sprintf("sequence No: %v->%v", obj.Status.ObservedGeneration, newStatus.ObservedGeneration))
obj.Status = *newStatus
updateErr = r.Client.Status().Update(context.TODO(), obj)
if updateErr == nil {
return obj, nil
}
if i >= updateRetries {
break
}
tmpObj := &unitv1alpha1.YurtAppSet{}
if getErr = r.Client.Get(context.TODO(), client.ObjectKey{Namespace: obj.Namespace, Name: obj.Name}, tmpObj); getErr != nil {
return nil, getErr
}
obj = tmpObj
}
klog.Errorf("could not update YurtAppSet %s/%s status: %s", yas.Namespace, yas.Name, updateErr)
return nil, updateErr
}
/*
Copyright 2021 The OpenYurt Authors.
Copyright 2019 The Kruise Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@CHANGELOG
OpenYurt Authors:
change some functions
*/
package yurtappset
import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/openyurtio/openyurt/pkg/apis/apps"
unitv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
)
const updateRetries = 5
type YurtAppSetPatches struct {
Replicas int32
Patch string
}
func getPoolNameFrom(metaObj metav1.Object) (string, error) {
name, exist := metaObj.GetLabels()[apps.PoolNameLabelKey]
if !exist {
return "", fmt.Errorf("could not get pool name from label of pool %s/%s: no label %s found", metaObj.GetNamespace(), metaObj.GetName(), apps.PoolNameLabelKey)
}
if len(name) == 0 {
return "", fmt.Errorf("could not get pool name from label of pool %s/%s: label %s has an empty value", metaObj.GetNamespace(), metaObj.GetName(), apps.PoolNameLabelKey)
}
return name, nil
}
// NewYurtAppSetCondition creates a new YurtAppSet condition.
func NewYurtAppSetCondition(condType unitv1alpha1.YurtAppSetConditionType, status corev1.ConditionStatus, reason, message string) *unitv1alpha1.YurtAppSetCondition {
return &unitv1alpha1.YurtAppSetCondition{
Type: condType,
Status: status,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
}
// GetYurtAppSetCondition returns the condition with the provided type.
func GetYurtAppSetCondition(status unitv1alpha1.YurtAppSetStatus, condType unitv1alpha1.YurtAppSetConditionType) *unitv1alpha1.YurtAppSetCondition {
for i := range status.Conditions {
c := status.Conditions[i]
if c.Type == condType {
return &c
}
}
return nil
}
// SetYurtAppSetCondition updates the YurtAppSet to include the provided condition. If the condition that
// we are about to add already exists and has the same status, reason and message then we are not going to update.
func SetYurtAppSetCondition(status *unitv1alpha1.YurtAppSetStatus, condition *unitv1alpha1.YurtAppSetCondition) {
currentCond := GetYurtAppSetCondition(*status, condition.Type)
if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
return
}
if currentCond != nil && currentCond.Status == condition.Status {
condition.LastTransitionTime = currentCond.LastTransitionTime
}
newConditions := filterOutCondition(status.Conditions, condition.Type)
status.Conditions = append(newConditions, *condition)
}
// RemoveYurtAppSetCondition removes the YurtAppSet condition with the provided type.
func RemoveYurtAppSetCondition(status *unitv1alpha1.YurtAppSetStatus, condType unitv1alpha1.YurtAppSetConditionType) {
status.Conditions = filterOutCondition(status.Conditions, condType)
}
func filterOutCondition(conditions []unitv1alpha1.YurtAppSetCondition, condType unitv1alpha1.YurtAppSetConditionType) []unitv1alpha1.YurtAppSetCondition {
var newConditions []unitv1alpha1.YurtAppSetCondition
for _, c := range conditions {
if c.Type == condType {
continue
}
newConditions = append(newConditions, c)
}
return newConditions
}
func GetNextPatches(yas *unitv1alpha1.YurtAppSet) map[string]YurtAppSetPatches {
next := make(map[string]YurtAppSetPatches)
for _, pool := range yas.Spec.Topology.Pools {
t := YurtAppSetPatches{}
if pool.Replicas != nil {
t.Replicas = *pool.Replicas
}
if pool.Patch != nil {
t.Patch = string(pool.Patch.Raw)
}
next[pool.Name] = t
}
return next
}
//go:build gofuzz
// +build gofuzz
/*
Copyright 2022 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package yurtappset
import (
"context"
"fmt"
fuzz "github.com/AdaLogics/go-fuzz-headers"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/yaml"
appsv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/yurtappset/adapter"
)
var (
fuzzCtx = context.Background()
fakeSchemeForFuzzing = runtime.NewScheme()
)
func init() {
_ = clientgoscheme.AddToScheme(fakeSchemeForFuzzing)
_ = appsv1alpha1.AddToScheme(fakeSchemeForFuzzing)
_ = corev1.AddToScheme(fakeSchemeForFuzzing)
}
// helper function to crate an unstructured object.
func GetUnstructured(f *fuzz.ConsumeFuzzer) (*unstructured.Unstructured, error) {
yamlStr, err := f.GetString()
if err != nil {
return nil, err
}
obj := make(map[string]interface{})
err = yaml.Unmarshal([]byte(yamlStr), &obj)
if err != nil {
return nil, err
}
return &unstructured.Unstructured{Object: obj}, nil
}
func validateUnstructured(unstr *unstructured.Unstructured) error {
if _, ok := unstr.Object["kind"]; !ok {
return fmt.Errorf("invalid unstr")
}
if _, ok := unstr.Object["apiVersion"]; !ok {
return fmt.Errorf("invalid unstr")
}
if _, ok := unstr.Object["spec"]; !ok {
return fmt.Errorf("invalid unstr")
}
if _, ok := unstr.Object["status"]; !ok {
return fmt.Errorf("invalid unstr")
}
return nil
}
func FuzzAppSetReconcile(data []byte) int {
f := fuzz.NewConsumer(data)
unstr, err := GetUnstructured(f)
if err != nil {
return 0
}
err = validateUnstructured(unstr)
if err != nil {
return 0
}
appset := &appsv1alpha1.YurtAppSet{}
if err := f.GenerateStruct(appset); err != nil {
return 0
}
clientFake := fake.NewClientBuilder().WithScheme(fakeSchemeForFuzzing).WithObjects(
appset,
).Build()
r := &ReconcileYurtAppSet{
Client: clientFake,
scheme: fakeSchemeForFuzzing,
recorder: record.NewFakeRecorder(10000),
poolControls: map[appsv1alpha1.TemplateType]ControlInterface{
appsv1alpha1.StatefulSetTemplateType: &PoolControl{Client: clientFake, scheme: fakeSchemeForFuzzing,
adapter: &adapter.StatefulSetAdapter{Client: clientFake, Scheme: fakeSchemeForFuzzing}},
appsv1alpha1.DeploymentTemplateType: &PoolControl{Client: clientFake, scheme: fakeSchemeForFuzzing,
adapter: &adapter.DeploymentAdapter{Client: clientFake, Scheme: fakeSchemeForFuzzing}},
},
}
_, _ = r.Reconcile(fuzzCtx, reconcile.Request{NamespacedName: types.NamespacedName{Name: appset.Name, Namespace: appset.Namespace}})
return 1
}
/*
Copyright 2021 The OpenYurt Authors.
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@CHANGELOG
OpenYurt Authors:
Subset to pool
*/
package yurtappset
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
unitv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/apps/v1alpha1"
"github.com/openyurtio/openyurt/pkg/yurtmanager/controller/util"
)
func (r *ReconcileYurtAppSet) managePools(yas *unitv1alpha1.YurtAppSet,
nameToPool map[string]*Pool, nextPatches map[string]YurtAppSetPatches,
expectedRevision *appsv1.ControllerRevision,
poolType unitv1alpha1.TemplateType) (newStatus *unitv1alpha1.YurtAppSetStatus, updateErr error) {
newStatus = yas.Status.DeepCopy()
exists, provisioned, err := r.managePoolProvision(yas, nameToPool, nextPatches, expectedRevision, poolType)
if err != nil {
SetYurtAppSetCondition(newStatus, NewYurtAppSetCondition(unitv1alpha1.PoolProvisioned, corev1.ConditionFalse, "Error", err.Error()))
return newStatus, fmt.Errorf("could not manage Pool provision: %s", err)
}
if provisioned {
SetYurtAppSetCondition(newStatus, NewYurtAppSetCondition(unitv1alpha1.PoolProvisioned, corev1.ConditionTrue, "", ""))
}
var needUpdate []string
for _, name := range exists.List() {
pool := nameToPool[name]
if r.poolControls[poolType].IsExpected(pool, expectedRevision.Name) ||
pool.Status.ReplicasInfo.Replicas != nextPatches[name].Replicas ||
pool.Status.PatchInfo != nextPatches[name].Patch {
needUpdate = append(needUpdate, name)
}
}
if len(needUpdate) > 0 {
_, updateErr = util.SlowStartBatch(len(needUpdate), slowStartInitialBatchSize, func(index int) error {
cell := needUpdate[index]
pool := nameToPool[cell]
replicas := nextPatches[cell].Replicas
klog.Infof("YurtAppSet %s/%s needs to update Pool (%s) %s/%s with revision %s, replicas %d ",
yas.Namespace, yas.Name, poolType, pool.Namespace, pool.Name, expectedRevision.Name, replicas)
updatePoolErr := r.poolControls[poolType].UpdatePool(pool, yas, expectedRevision.Name, replicas)
if updatePoolErr != nil {
r.recorder.Event(yas.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypePoolsUpdate), fmt.Sprintf("Error updating PodSet (%s) %s when updating: %s", poolType, pool.Name, updatePoolErr))
}
return updatePoolErr
})
}
if updateErr == nil {
SetYurtAppSetCondition(newStatus, NewYurtAppSetCondition(unitv1alpha1.PoolUpdated, corev1.ConditionTrue, "", ""))
} else {
SetYurtAppSetCondition(newStatus, NewYurtAppSetCondition(unitv1alpha1.PoolUpdated, corev1.ConditionFalse, "Error", updateErr.Error()))
}
return
}
func (r *ReconcileYurtAppSet) managePoolProvision(yas *unitv1alpha1.YurtAppSet,
nameToPool map[string]*Pool, nextPatches map[string]YurtAppSetPatches,
expectedRevision *appsv1.ControllerRevision, workloadType unitv1alpha1.TemplateType) (sets.String, bool, error) {
expectedPools := sets.String{}
gotPools := sets.String{}
for _, pool := range yas.Spec.Topology.Pools {
expectedPools.Insert(pool.Name)
}
for poolName := range nameToPool {
gotPools.Insert(poolName)
}
klog.V(4).Infof("YurtAppSet %s/%s has pools %v, expects pools %v", yas.Namespace, yas.Name, gotPools.List(), expectedPools.List())
var creates []string
for _, expectPool := range expectedPools.List() {
if gotPools.Has(expectPool) {
continue
}
creates = append(creates, expectPool)
}
var deletes []string
for _, gotPool := range gotPools.List() {
if expectedPools.Has(gotPool) {
continue
}
deletes = append(deletes, gotPool)
}
revision := expectedRevision.Name
var errs []error
// manage creating
if len(creates) > 0 {
// do not consider deletion
klog.Infof("YurtAppSet %s/%s needs creating pool (%s) with name: %v", yas.Namespace, yas.Name, workloadType, creates)
createdPools := make([]string, len(creates))
for i, pool := range creates {
createdPools[i] = pool
}
var createdNum int
var createdErr error
createdNum, createdErr = util.SlowStartBatch(len(creates), slowStartInitialBatchSize, func(idx int) error {
poolName := createdPools[idx]
replicas := nextPatches[poolName].Replicas
err := r.poolControls[workloadType].CreatePool(yas, poolName, revision, replicas)
if err != nil {
if !errors.IsTimeout(err) {
return fmt.Errorf("could not create Pool (%s) %s: %s", workloadType, poolName, err.Error())
}
}
return nil
})
if createdErr == nil {
r.recorder.Eventf(yas.DeepCopy(), corev1.EventTypeNormal, fmt.Sprintf("Successful%s", eventTypePoolsUpdate), "Create %d Pool (%s)", createdNum, workloadType)
} else {
errs = append(errs, createdErr)
}
}
// manage deleting
if len(deletes) > 0 {
klog.Infof("YurtAppSet %s/%s needs deleting pool (%s) with name: [%v]", yas.Namespace, yas.Name, workloadType, deletes)
var deleteErrs []error
for _, poolName := range deletes {
pool := nameToPool[poolName]
if err := r.poolControls[workloadType].DeletePool(pool); err != nil {
deleteErrs = append(deleteErrs, fmt.Errorf("could not delete Pool (%s) %s/%s for %s: %s", workloadType, pool.Namespace, pool.Name, poolName, err))
}
}
if len(deleteErrs) > 0 {
errs = append(errs, deleteErrs...)
} else {
r.recorder.Eventf(yas.DeepCopy(), corev1.EventTypeNormal, fmt.Sprintf("Successful%s", eventTypePoolsUpdate), "Delete %d Pool (%s)", len(deletes), workloadType)
}
}
// clean the other kind of pools
// maybe user can chagne yas.Spec.WorkloadTemplate
cleaned := false
for t, control := range r.poolControls {
if t == workloadType {
continue
}
pools, err := control.GetAllPools(yas)
if err != nil {
errs = append(errs, fmt.Errorf("could not list Pool of other type %s for YurtAppSet %s/%s: %s", t, yas.Namespace, yas.Name, err))
continue
}
for _, pool := range pools {
cleaned = true
if err := control.DeletePool(pool); err != nil {
errs = append(errs, fmt.Errorf("could not delete Pool %s of other type %s for YurtAppSet %s/%s: %s", pool.Name, t, yas.Namespace, yas.Name, err))
continue
}
}
}
return expectedPools.Intersection(gotPools), len(creates) > 0 || len(deletes) > 0 || cleaned, utilerrors.NewAggregate(errs)
}