/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package parser import ( "errors" "io" "os" "path/filepath" "github.com/spf13/afero" ) var _ AnnotatedReadCloser = &FsReadCloser{} // FsReadCloserAnnotation annotates data for an FsReadCloser. type FsReadCloserAnnotation struct { path string position int } // FsReadCloser implements io.ReadCloser for an Afero filesystem. type FsReadCloser struct { fs afero.Fs dir string paths []string index int position int writeBreak bool wroteBreak bool } // A FilterFn filters files when the FsReadCloser walks the filesystem. // Returning true indicates the file should be skipped. Returning an error will // cause the FsReadCloser to stop walking the filesystem and return. type FilterFn func(path string, info os.FileInfo) (bool, error) // SkipPath skips files at a certain path. func SkipPath(pattern string) FilterFn { return func(path string, _ os.FileInfo) (bool, error) { return filepath.Match(pattern, path) } } // SkipDirs skips directories. func SkipDirs() FilterFn { return func(_ string, info os.FileInfo) (bool, error) { if info.IsDir() { return true, nil } return false, nil } } // SkipEmpty skips empty files. func SkipEmpty() FilterFn { return func(_ string, info os.FileInfo) (bool, error) { return info.Size() == 0, nil } } // SkipNotYAML skips files that do not have YAML extension. func SkipNotYAML() FilterFn { return func(path string, _ os.FileInfo) (bool, error) { if filepath.Ext(path) != ".yaml" && filepath.Ext(path) != ".yml" { return true, nil } return false, nil } } // NewFsReadCloser returns an FsReadCloser that implements io.ReadCloser. It // walks the filesystem ahead of time, then reads file contents when Read is // invoked. It does not follow symbolic links. func NewFsReadCloser(fs afero.Fs, dir string, fns ...FilterFn) (*FsReadCloser, error) { paths := []string{} err := afero.Walk(fs, dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } for _, fn := range fns { filter, err := fn(path, info) if err != nil { return err } if filter { return nil } } paths = append(paths, path) return nil }) return &FsReadCloser{ fs: fs, dir: dir, paths: paths, index: 0, position: 0, writeBreak: false, wroteBreak: false, }, err } func (r *FsReadCloser) Read(p []byte) (n int, err error) { if r.wroteBreak { r.index++ r.position = 0 r.wroteBreak = false n = copy(p, "\n---\n") return n, nil } if r.index == len(r.paths) { return 0, io.EOF } if r.writeBreak { n = copy(p, "\n...\n") r.writeBreak = false r.wroteBreak = true return n, nil } b, err := afero.ReadFile(r.fs, r.paths[r.index]) n = copy(p, b[r.position:]) r.position += n if errors.Is(err, io.EOF) || n == 0 { r.writeBreak = true err = nil } return n, err } // Close is a no op for an FsReadCloser. func (r *FsReadCloser) Close() error { return nil } // Annotate returns additional about the data currently being read. func (r *FsReadCloser) Annotate() any { // Index will be out of bounds if we error after the final file has been // read. index := r.index if index == len(r.paths) { index-- } return FsReadCloserAnnotation{ path: r.paths[index], position: r.position, } }
package parser import ( "bytes" "context" "io" "testing" "k8s.io/apimachinery/pkg/runtime" ) func FuzzParse(f *testing.F) { f.Fuzz(func(_ *testing.T, data []byte) { objScheme := runtime.NewScheme() metaScheme := runtime.NewScheme() p := New(metaScheme, objScheme) r := io.NopCloser(bytes.NewReader(data)) _, _ = p.Parse(context.Background(), r) }) }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package parser import ( "strings" "k8s.io/apimachinery/pkg/runtime" "github.com/crossplane/crossplane-runtime/pkg/errors" ) const ( errNilLinterFn = "linter function is nil" errOrFmt = "object did not pass any of the linters with following errors: %s" ) // A Linter lints packages. type Linter interface { Lint(l Lintable) error } // PackageLinterFn lints an entire package. If function applies a check for // multiple objects, consider using an ObjectLinterFn. type PackageLinterFn func(Lintable) error // PackageLinterFns is a convenience function to pass multiple PackageLinterFn // to a function that cannot accept variadic arguments. func PackageLinterFns(fns ...PackageLinterFn) []PackageLinterFn { return fns } // ObjectLinterFn lints an object in a package. type ObjectLinterFn func(runtime.Object) error // ObjectLinterFns is a convenience function to pass multiple ObjectLinterFn to // a function that cannot accept variadic arguments. func ObjectLinterFns(fns ...ObjectLinterFn) []ObjectLinterFn { return fns } // PackageLinter lints packages by applying package and object linter functions // to it. type PackageLinter struct { pre []PackageLinterFn perMeta []ObjectLinterFn perObject []ObjectLinterFn } // NewPackageLinter creates a new PackageLinter. func NewPackageLinter(pre []PackageLinterFn, perMeta, perObject []ObjectLinterFn) *PackageLinter { return &PackageLinter{ pre: pre, perMeta: perMeta, perObject: perObject, } } // Lint executes all linter functions against a package. func (l *PackageLinter) Lint(pkg Lintable) error { for _, fn := range l.pre { if err := fn(pkg); err != nil { return err } } for _, o := range pkg.GetMeta() { for _, fn := range l.perMeta { if err := fn(o); err != nil { return err } } } for _, o := range pkg.GetObjects() { for _, fn := range l.perObject { if err := fn(o); err != nil { return err } } } return nil } // Or checks that at least one of the passed linter functions does not return an // error. func Or(linters ...ObjectLinterFn) ObjectLinterFn { return func(o runtime.Object) error { var errs []string for _, l := range linters { if l == nil { return errors.New(errNilLinterFn) } err := l(o) if err == nil { return nil } errs = append(errs, err.Error()) } return errors.Errorf(errOrFmt, strings.Join(errs, ", ")) } }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package parser implements a parser for Crossplane packages. package parser import ( "bufio" "context" "io" "strings" "unicode" "github.com/spf13/afero" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" "github.com/crossplane/crossplane-runtime/pkg/errors" ) // Lintable defines the common API for lintable packages. type Lintable interface { // GetMeta returns metadata objects of the lintable package, such as // Provider, Configuration or Function. GetMeta() []runtime.Object // GetObjects returns objects of the lintable package. GetObjects() []runtime.Object } // AnnotatedReadCloser is a wrapper around io.ReadCloser that allows // implementations to supply additional information about data that is read. type AnnotatedReadCloser interface { io.ReadCloser Annotate() any } // ObjectCreaterTyper know how to create and determine the type of objects. type ObjectCreaterTyper interface { runtime.ObjectCreater runtime.ObjectTyper } // Package is the set of metadata and objects in a package. type Package struct { meta []runtime.Object objects []runtime.Object } // NewPackage creates a new Package. func NewPackage() *Package { return &Package{} } // GetMeta gets metadata from the package. func (p *Package) GetMeta() []runtime.Object { return p.meta } // GetObjects gets objects from the package. func (p *Package) GetObjects() []runtime.Object { return p.objects } // Parser is a package parser. type Parser interface { Parse(ctx context.Context, rc io.ReadCloser) (*Package, error) } // PackageParser is a Parser implementation for parsing packages. type PackageParser struct { metaScheme ObjectCreaterTyper objScheme ObjectCreaterTyper } // New returns a new PackageParser. func New(meta, obj ObjectCreaterTyper) *PackageParser { return &PackageParser{ metaScheme: meta, objScheme: obj, } } // Parse is the underlying logic for parsing packages. It first attempts to // decode objects recognized by the meta scheme, then attempts to decode objects // recognized by the object scheme. Objects not recognized by either scheme // return an error rather than being skipped. func (p *PackageParser) Parse(_ context.Context, reader io.ReadCloser) (*Package, error) { pkg := NewPackage() if reader == nil { return pkg, nil } defer func() { _ = reader.Close() }() yr := yaml.NewYAMLReader(bufio.NewReader(reader)) dm := json.NewSerializerWithOptions(json.DefaultMetaFactory, p.metaScheme, p.metaScheme, json.SerializerOptions{Yaml: true}) do := json.NewSerializerWithOptions(json.DefaultMetaFactory, p.objScheme, p.objScheme, json.SerializerOptions{Yaml: true}) for { content, err := yr.Read() if err != nil && !errors.Is(err, io.EOF) { return pkg, err } if errors.Is(err, io.EOF) { break } if isEmptyYAML(content) { continue } m, _, err := dm.Decode(content, nil, nil) if err != nil { // NOTE(hasheddan): we only try to decode with object scheme if the // error is due the object not being registered in the meta scheme. if !runtime.IsNotRegisteredError(err) { return pkg, annotateErr(err, reader) } o, _, err := do.Decode(content, nil, nil) if err != nil { return pkg, annotateErr(err, reader) } pkg.objects = append(pkg.objects, o) continue } pkg.meta = append(pkg.meta, m) } return pkg, nil } // isEmptyYAML checks whether the provided YAML can be considered empty. This // is useful for filtering out empty YAML documents that would otherwise // cause issues when decoded. func isEmptyYAML(y []byte) bool { for _, line := range strings.Split(string(y), "\n") { trimmed := strings.TrimLeftFunc(line, unicode.IsSpace) // We don't want to return an empty document with only separators that // have nothing in-between. if trimmed != "" && trimmed != "---" && trimmed != "..." && !strings.HasPrefix(trimmed, "#") { return false } } return true } // annotateErr annotates an error if the reader is an AnnotatedReadCloser. func annotateErr(err error, reader io.ReadCloser) error { if anno, ok := reader.(AnnotatedReadCloser); ok { return errors.Wrapf(err, "%+v", anno.Annotate()) } return err } // BackendOption modifies the parser backend. Backends may accept options at // creation time, but must accept them at initialization. type BackendOption func(Backend) // Backend provides a source for a parser. type Backend interface { Init(ctx context.Context, o ...BackendOption) (io.ReadCloser, error) } // PodLogBackend is a parser backend that uses Kubernetes pod logs as source. type PodLogBackend struct { client kubernetes.Interface name string namespace string } // NewPodLogBackend returns a new PodLogBackend. func NewPodLogBackend(bo ...BackendOption) *PodLogBackend { p := &PodLogBackend{} for _, o := range bo { o(p) } return p } // Init initializes a PodLogBackend. func (p *PodLogBackend) Init(ctx context.Context, bo ...BackendOption) (io.ReadCloser, error) { for _, o := range bo { o(p) } logs := p.client.CoreV1().Pods(p.namespace).GetLogs(p.name, &corev1.PodLogOptions{}) reader, err := logs.Stream(ctx) if err != nil { return nil, err } return reader, nil } // PodName sets the pod name of a PodLogBackend. func PodName(name string) BackendOption { return func(p Backend) { pl, ok := p.(*PodLogBackend) if !ok { return } pl.name = name } } // PodNamespace sets the pod namespace of a PodLogBackend. func PodNamespace(namespace string) BackendOption { return func(p Backend) { pl, ok := p.(*PodLogBackend) if !ok { return } pl.namespace = namespace } } // PodClient sets the pod client of a PodLogBackend. func PodClient(client kubernetes.Interface) BackendOption { return func(p Backend) { pl, ok := p.(*PodLogBackend) if !ok { return } pl.client = client } } // NopBackend is a parser backend with empty source. type NopBackend struct{} // NewNopBackend returns a new NopBackend. func NewNopBackend(...BackendOption) *NopBackend { return &NopBackend{} } // Init initializes a NopBackend. func (p *NopBackend) Init(_ context.Context, _ ...BackendOption) (io.ReadCloser, error) { return nil, nil } // FsBackend is a parser backend that uses a filestystem as source. type FsBackend struct { fs afero.Fs dir string skips []FilterFn } // NewFsBackend returns an FsBackend. func NewFsBackend(fs afero.Fs, bo ...BackendOption) *FsBackend { f := &FsBackend{ fs: fs, } for _, o := range bo { o(f) } return f } // Init initializes an FsBackend. func (p *FsBackend) Init(_ context.Context, bo ...BackendOption) (io.ReadCloser, error) { for _, o := range bo { o(p) } return NewFsReadCloser(p.fs, p.dir, p.skips...) } // FsDir sets the directory of an FsBackend. func FsDir(dir string) BackendOption { return func(p Backend) { f, ok := p.(*FsBackend) if !ok { return } f.dir = dir } } // FsFilters adds FilterFns to an FsBackend. func FsFilters(skips ...FilterFn) BackendOption { return func(p Backend) { f, ok := p.(*FsBackend) if !ok { return } f.skips = skips } } // EchoBackend is a parser backend that uses string input as source. type EchoBackend struct { echo string } // NewEchoBackend returns a new EchoBackend. func NewEchoBackend(echo string) Backend { return &EchoBackend{ echo: echo, } } // Init initializes an EchoBackend. func (p *EchoBackend) Init(_ context.Context, bo ...BackendOption) (io.ReadCloser, error) { for _, o := range bo { o(p) } return io.NopCloser(strings.NewReader(p.echo)), nil }
/* Copyright 2024 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package claim import ( "context" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/resource" "github.com/crossplane/crossplane/internal/xresource" ) // Error strings. const ( errGetSecret = "cannot get composite resource's connection secret" errSecretConflict = "cannot establish control of existing connection secret" errCreateOrUpdateSecret = "cannot create or update connection secret" ) // An APIConnectionPropagator propagates connection details by reading // them from and writing them to a Kubernetes API server. type APIConnectionPropagator struct { client resource.ClientApplicator } // NewAPIConnectionPropagator returns a new APIConnectionPropagator. func NewAPIConnectionPropagator(c client.Client) *APIConnectionPropagator { return &APIConnectionPropagator{ client: resource.ClientApplicator{Client: c, Applicator: resource.NewAPIUpdatingApplicator(c)}, } } // PropagateConnection details from the supplied resource. func (a *APIConnectionPropagator) PropagateConnection(ctx context.Context, to xresource.LocalConnectionSecretOwner, from xresource.ConnectionSecretOwner) (bool, error) { // Either from does not expose a connection secret, or to does not want one. if from.GetWriteConnectionSecretToReference() == nil || to.GetWriteConnectionSecretToReference() == nil { return false, nil } n := types.NamespacedName{ Namespace: from.GetWriteConnectionSecretToReference().Namespace, Name: from.GetWriteConnectionSecretToReference().Name, } fs := &corev1.Secret{} if err := a.client.Get(ctx, n, fs); err != nil { return false, errors.Wrap(err, errGetSecret) } // Make sure 'from' is the controller of the connection secret it references // before we propagate it. This ensures a resource cannot use Crossplane to // circumvent RBAC by propagating a secret it does not own. if c := metav1.GetControllerOf(fs); c == nil || c.UID != from.GetUID() { return false, errors.New(errSecretConflict) } ts := xresource.LocalConnectionSecretFor(to, to.GetObjectKind().GroupVersionKind()) ts.Data = fs.Data err := a.client.Apply(ctx, ts, resource.ConnectionSecretMustBeControllableBy(to.GetUID()), resource.AllowUpdateIf(func(current, desired runtime.Object) bool { // We consider the update to be a no-op and don't allow it if the // current and existing secret data are identical. //nolint:forcetypeassert // These will always be secrets. return !cmp.Equal(current.(*corev1.Secret).Data, desired.(*corev1.Secret).Data, cmpopts.EquateEmpty()) }), ) if resource.IsNotAllowed(err) { // The update was not allowed because it was a no-op. return false, nil } if err != nil { return false, errors.Wrap(err, errCreateOrUpdateSecret) } return true, nil }
/* Copyright 2023 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package claim import ( "context" "testing" fuzz "github.com/AdaLogics/go-fuzz-headers" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/crossplane/crossplane-runtime/pkg/resource" "github.com/crossplane/crossplane-runtime/pkg/resource/fake" "github.com/crossplane/crossplane-runtime/pkg/test" ) func FuzzPropagateConnection(f *testing.F) { f.Fuzz(func(_ *testing.T, data []byte) { f := fuzz.NewConsumer(data) cp := &fake.Composite{} cm := &fake.CompositeClaim{} err := f.GenerateStruct(cp) if err != nil { return } err = f.GenerateStruct(cm) if err != nil { return } mgcsdata := make(map[string][]byte) err = f.FuzzMap(&mgcsdata) if err != nil { return } c := resource.ClientApplicator{ Client: &test.MockClient{ MockGet: test.NewMockGetFn(nil, func(o client.Object) error { // The managed secret has some data when we get it. s := resource.ConnectionSecretFor(cp, schema.GroupVersionKind{}) s.Data = mgcsdata *o.(*corev1.Secret) = *s return nil }), }, Applicator: resource.ApplyFn(func(_ context.Context, _ client.Object, _ ...resource.ApplyOption) error { return nil }), } api := &APIConnectionPropagator{client: c} _, _ = api.PropagateConnection(context.Background(), cm, cp) }) }
/* Copyright 2024 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package claim import ( "strings" "dario.cat/mergo" "github.com/crossplane/crossplane-runtime/pkg/errors" ) const ( errUnsupportedDstObject = "destination object was not valid object" errUnsupportedSrcObject = "source object was not valid object" ) func withoutReservedK8sEntries(a map[string]string) map[string]string { for k := range a { s := strings.Split(k, "/") if strings.HasSuffix(s[0], "kubernetes.io") || strings.HasSuffix(s[0], "k8s.io") { delete(a, k) } } return a } func withoutKeys(in map[string]any, keys ...string) map[string]any { filter := map[string]bool{} for _, k := range keys { filter[k] = true } out := map[string]any{} for k, v := range in { if filter[k] { continue } out[k] = v } return out } type mergeConfig struct { mergeOptions []func(*mergo.Config) srcfilter []string } // withMergeOptions allows custom mergo.Config options. func withMergeOptions(opts ...func(*mergo.Config)) func(*mergeConfig) { return func(config *mergeConfig) { config.mergeOptions = opts } } // withSrcFilter filters supplied keys from src map before merging. func withSrcFilter(keys ...string) func(*mergeConfig) { return func(config *mergeConfig) { config.srcfilter = keys } } // merge a src map into dst map. func merge(dst, src any, opts ...func(*mergeConfig)) error { if dst == nil || src == nil { // Nothing available to merge if dst or src are nil. // This can occur early on in reconciliation when the // status subresource has not been set yet. return nil } config := &mergeConfig{} for _, opt := range opts { opt(config) } dstMap, ok := dst.(map[string]any) if !ok { return errors.New(errUnsupportedDstObject) } srcMap, ok := src.(map[string]any) if !ok { return errors.New(errUnsupportedSrcObject) } return mergo.Merge(&dstMap, withoutKeys(srcMap, config.srcfilter...), config.mergeOptions...) }
/* Copyright 2019 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package claim implements composite resource claims. package claim import ( "context" "time" "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/conditions" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/resource" "github.com/crossplane/crossplane/internal/names" "github.com/crossplane/crossplane/internal/xresource" "github.com/crossplane/crossplane/internal/xresource/unstructured/claim" "github.com/crossplane/crossplane/internal/xresource/unstructured/composite" ) const ( finalizer = "finalizer.apiextensions.crossplane.io" reconcileTimeout = 1 * time.Minute ) // Error strings. const ( errGetClaim = "cannot get claim" errGetComposite = "cannot get bound composite resource" errDeleteComposite = "cannot delete bound composite resource" errDeleteCDs = "cannot delete connection details" errRemoveFinalizer = "cannot remove finalizer from claim" errAddFinalizer = "cannot add finalizer to claim" errUpgradeManagedFields = "cannot upgrade composite resource's managed fields from client-side to server-side apply" errSync = "cannot bind and sync claim with composite resource" errPropagateCDs = "cannot propagate connection details from composite resource" errUpdateClaimStatus = "cannot update claim status" errFmtUnbound = "refusing to operate on composite resource %q that is not bound to this claim: bound to claim %q" ) const reconcilePausedMsg = "Reconciliation (including deletion) is paused via the pause annotation" // Event reasons. const ( reasonBind event.Reason = "BindCompositeResource" reasonDelete event.Reason = "DeleteCompositeResource" reasonPropagate event.Reason = "PropagateConnectionSecret" reasonPaused event.Reason = "ReconciliationPaused" ) // ControllerName returns the recommended name for controllers that use this // package to reconcile a particular kind of composite resource claim. func ControllerName(name string) string { return "claim/" + name } // A ManagedFieldsUpgrader upgrades an objects managed fields from client-side // apply to server-side apply. This is necessary when an object was previously // managed using client-side apply, but should now be managed using server-side // apply. See https://github.com/kubernetes/kubernetes/issues/99003 for details. type ManagedFieldsUpgrader interface { Upgrade(ctx context.Context, obj client.Object, ssaManager string) error } // A CompositeSyncer binds and syncs the supplied claim with the supplied // composite resource (XR). type CompositeSyncer interface { Sync(ctx context.Context, cm *claim.Unstructured, xr *composite.Unstructured) error } // A CompositeSyncerFn binds and syncs the supplied claim with the supplied // composite resource (XR). type CompositeSyncerFn func(ctx context.Context, cm *claim.Unstructured, xr *composite.Unstructured) error // Sync the supplied claim with the supplied composite resource. func (fn CompositeSyncerFn) Sync(ctx context.Context, cm *claim.Unstructured, xr *composite.Unstructured) error { return fn(ctx, cm, xr) } // A ConnectionPropagator is responsible for propagating information required to // connect to a resource. type ConnectionPropagator interface { PropagateConnection(ctx context.Context, to xresource.LocalConnectionSecretOwner, from xresource.ConnectionSecretOwner) (propagated bool, err error) } // A ConnectionPropagatorFn is responsible for propagating information required // to connect to a resource. type ConnectionPropagatorFn func(ctx context.Context, to xresource.LocalConnectionSecretOwner, from xresource.ConnectionSecretOwner) (propagated bool, err error) // PropagateConnection details from one resource to the other. func (fn ConnectionPropagatorFn) PropagateConnection(ctx context.Context, to xresource.LocalConnectionSecretOwner, from xresource.ConnectionSecretOwner) (propagated bool, err error) { return fn(ctx, to, from) } // A ConnectionPropagatorChain runs multiple connection propagators. type ConnectionPropagatorChain []ConnectionPropagator // PropagateConnection details from one resource to the other. // This method calls PropagateConnection for all ConnectionPropagator's in the // chain and returns propagated if at least one ConnectionPropagator propagates // the connection details but exits with an error if any of them fails without // calling the remaining ones. func (pc ConnectionPropagatorChain) PropagateConnection(ctx context.Context, to xresource.LocalConnectionSecretOwner, from xresource.ConnectionSecretOwner) (propagated bool, err error) { for _, p := range pc { var pg bool pg, err = p.PropagateConnection(ctx, to, from) if pg { propagated = true } if err != nil { return propagated, err } } return propagated, nil } // A DefaultsSelector copies default values from the CompositeResourceDefinition when the corresponding field // in the Claim is not set. type DefaultsSelector interface { // SelectDefaults from CompositeResourceDefinition when needed. SelectDefaults(ctx context.Context, cm xresource.Claim) error } // A DefaultsSelectorFn is responsible for copying default values from the CompositeResourceDefinition. type DefaultsSelectorFn func(ctx context.Context, cm xresource.Claim) error // SelectDefaults copies default values from the XRD if necessary. func (fn DefaultsSelectorFn) SelectDefaults(ctx context.Context, cm xresource.Claim) error { return fn(ctx, cm) } // A Reconciler reconciles claims by creating exactly one kind of composite // resource (XR). Each claim kind should create an instance of this controller // for each XR kind they can bind to. Each controller must watch its subset of // claims and any XRs they bind to. type Reconciler struct { client client.Client gvkClaim schema.GroupVersionKind gvkXR schema.GroupVersionKind managedFields ManagedFieldsUpgrader // The below structs embed the set of interfaces used to implement the // composite resource claim reconciler. We do this primarily for // readability, so that the reconciler logic reads r.composite.Sync(), // r.claim.Finalize(), etc. composite crComposite claim crClaim log logging.Logger record event.Recorder conditions conditions.Manager } type crComposite struct { CompositeSyncer ConnectionPropagator } func defaultCRComposite(c client.Client) crComposite { return crComposite{ CompositeSyncer: NewClientSideCompositeSyncer(c, names.NewNameGenerator(c)), ConnectionPropagator: NewAPIConnectionPropagator(c), } } type crClaim struct { resource.Finalizer } func defaultCRClaim(c client.Client) crClaim { return crClaim{ Finalizer: resource.NewAPIFinalizer(c, finalizer), } } // A ReconcilerOption configures a Reconciler. type ReconcilerOption func(*Reconciler) // WithManagedFieldsUpgrader specifies how the Reconciler should upgrade claim // and composite resource (XR) managed fields from client-side apply to // server-side apply. func WithManagedFieldsUpgrader(u ManagedFieldsUpgrader) ReconcilerOption { return func(r *Reconciler) { r.managedFields = u } } // WithCompositeSyncer specifies how the Reconciler should sync claims with // composite resources (XRs). func WithCompositeSyncer(cs CompositeSyncer) ReconcilerOption { return func(r *Reconciler) { r.composite.CompositeSyncer = cs } } // WithConnectionPropagator specifies which ConnectionPropagator should be used // to propagate resource connection details to their claim. func WithConnectionPropagator(p ConnectionPropagator) ReconcilerOption { return func(r *Reconciler) { r.composite.ConnectionPropagator = p } } // WithClaimFinalizer specifies which ClaimFinalizer should be used to finalize // claims when they are deleted. func WithClaimFinalizer(f resource.Finalizer) ReconcilerOption { return func(r *Reconciler) { r.claim.Finalizer = f } } // WithLogger specifies how the Reconciler should log messages. func WithLogger(l logging.Logger) ReconcilerOption { return func(r *Reconciler) { r.log = l } } // WithRecorder specifies how the Reconciler should record events. func WithRecorder(er event.Recorder) ReconcilerOption { return func(r *Reconciler) { r.record = er } } // NewReconciler returns a Reconciler that reconciles composite resource claims of // the supplied ClaimKind with resources of the supplied CompositeKind. // The returned Reconciler will apply only the ObjectMetaConfigurator by // default; most callers should supply one or more CompositeConfigurators to // configure their composite resources. func NewReconciler(c client.Client, of, with schema.GroupVersionKind, o ...ReconcilerOption) *Reconciler { r := &Reconciler{ client: c, gvkClaim: of, gvkXR: with, managedFields: &NopManagedFieldsUpgrader{}, composite: defaultCRComposite(c), claim: defaultCRClaim(c), log: logging.NewNopLogger(), record: event.NewNopRecorder(), conditions: conditions.ObservedGenerationPropagationManager{}, } for _, ro := range o { ro(r) } return r } // Reconcile a composite resource claim with a concrete composite resource. func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { //nolint:gocognit // Complexity is tough to avoid here. log := r.log.WithValues("request", req) log.Debug("Reconciling") ctx, cancel := context.WithTimeout(ctx, reconcileTimeout) defer cancel() cm := claim.New(claim.WithGroupVersionKind(r.gvkClaim)) if err := r.client.Get(ctx, req.NamespacedName, cm); err != nil { // There's no need to requeue if we no longer exist. Otherwise we'll be // requeued implicitly because we return an error. log.Debug(errGetClaim, "error", err) return reconcile.Result{}, errors.Wrap(resource.IgnoreNotFound(err), errGetClaim) } status := r.conditions.For(cm) record := r.record.WithAnnotations("external-name", meta.GetExternalName(cm)) log = log.WithValues( "uid", cm.GetUID(), "version", cm.GetResourceVersion(), "external-name", meta.GetExternalName(cm), ) // Check the pause annotation and return if it has the value "true" after // logging, publishing an event and updating the Synced status condition. if meta.IsPaused(cm) { r.record.Event(cm, event.Normal(reasonPaused, reconcilePausedMsg)) status.MarkConditions(xpv1.ReconcilePaused().WithMessage(reconcilePausedMsg)) // If the pause annotation is removed, we will have a chance to // reconcile again and resume and if status update fails, we will // reconcile again to retry to update the status. return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } xr := composite.New(composite.WithGroupVersionKind(r.gvkXR), composite.WithSchema(composite.SchemaLegacy)) if ref := cm.GetResourceReference(); ref != nil { record = record.WithAnnotations("composite-name", cm.GetResourceReference().Name) log = log.WithValues("composite-name", cm.GetResourceReference().Name) if err := r.client.Get(ctx, types.NamespacedName{Name: ref.Name}, xr); resource.IgnoreNotFound(err) != nil { err = errors.Wrap(err, errGetComposite) record.Event(cm, event.Warning(reasonBind, err)) status.MarkConditions(xpv1.ReconcileError(err)) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } } // Return early if the claim references an XR that doesn't reference it. // // We don't requeue in this situation because the claim will need human // intervention before we can proceed (e.g. fixing the ref), and we'll be // queued implicitly when the claim is edited. // // A claim might be able to delete an XR it's not bound to, as long as the // XR is bindable but not yet bound. This is because we only check the claim // ref if the XR has one - this allows us to bind unbound claims. Given that // the claim could bind this XR, then be deleted and in turn delete the XR // this is not an issue. if ref := xr.GetClaimReference(); meta.WasCreated(xr) && ref != nil && !cmp.Equal(cm.GetReference(), ref) { err := errors.Errorf(errFmtUnbound, xr.GetName(), ref.Name) record.Event(cm, event.Warning(reasonBind, err)) status.MarkConditions(xpv1.ReconcileError(err)) return reconcile.Result{Requeue: false}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } // TODO(negz): Remove this call to Upgrade once no supported version of // Crossplane uses client-side apply to sync claims with XRs. We only need // to upgrade field managers if _this controller_ might have applied the XR // before using the default client-side apply field manager "crossplane", // but now wants to use server-side apply instead. if err := r.managedFields.Upgrade(ctx, xr, FieldOwnerXR); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errUpgradeManagedFields) record.Event(cm, event.Warning(reasonBind, err)) status.MarkConditions(xpv1.ReconcileError(err)) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } if meta.WasDeleted(cm) { log = log.WithValues("deletion-timestamp", cm.GetDeletionTimestamp()) status.MarkConditions(xpv1.Deleting()) if meta.WasCreated(xr) { requiresForegroundDeletion := false if cdp := cm.GetCompositeDeletePolicy(); cdp != nil && *cdp == xpv1.CompositeDeleteForeground { requiresForegroundDeletion = true } if meta.WasDeleted(xr) && requiresForegroundDeletion { log.Debug("Waiting for the XR to finish deleting (foreground deletion)") return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } do := &client.DeleteOptions{} if requiresForegroundDeletion { client.PropagationPolicy(metav1.DeletePropagationForeground).ApplyToDelete(do) } if err := r.client.Delete(ctx, xr, do); resource.IgnoreNotFound(err) != nil { err = errors.Wrap(err, errDeleteComposite) record.Event(cm, event.Warning(reasonDelete, err)) status.MarkConditions(xpv1.ReconcileError(err)) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } if requiresForegroundDeletion { log.Debug("Waiting for the XR to finish deleting (foreground deletion)") return reconcile.Result{Requeue: true}, nil } } record.Event(cm, event.Normal(reasonDelete, "Successfully deleted composite resource")) if err := r.claim.RemoveFinalizer(ctx, cm); err != nil { err = errors.Wrap(err, errRemoveFinalizer) record.Event(cm, event.Warning(reasonDelete, err)) status.MarkConditions(xpv1.ReconcileError(err)) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } log.Debug("Successfully deleted claim") status.MarkConditions(xpv1.ReconcileSuccess()) return reconcile.Result{Requeue: false}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } if err := r.claim.AddFinalizer(ctx, cm); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errAddFinalizer) record.Event(cm, event.Warning(reasonBind, err)) status.MarkConditions(xpv1.ReconcileError(err)) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } // The XR's claim reference before syncing. Used to determine if we bind it. before := xr.GetClaimReference() // Create (if necessary), bind, and sync an XR with the claim. if err := r.composite.Sync(ctx, cm, xr); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errSync) record.Event(cm, event.Warning(reasonBind, err)) status.MarkConditions(xpv1.ReconcileError(err)) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } // The XR didn't reference the claim before the sync, but does now. if ref := cm.GetReference(); !cmp.Equal(before, ref) && cmp.Equal(xr.GetClaimReference(), ref) { record.Event(cm, event.Normal(reasonBind, "Successfully bound composite resource")) } status.MarkConditions(xpv1.ReconcileSuccess()) // Copy any custom status conditions from the XR to the claim. for _, cType := range xr.GetClaimConditionTypes() { c := xr.GetCondition(cType) status.MarkConditions(c) } if !resource.IsConditionTrue(xr.GetCondition(xpv1.TypeReady)) { record.Event(cm, event.Normal(reasonBind, "Composite resource is not yet ready")) // We should be watching the composite resource and will have a // request queued if it changes, so no need to requeue. status.MarkConditions(Waiting()) return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } propagated, err := r.composite.PropagateConnection(ctx, cm, xr) if err != nil { err = errors.Wrap(err, errPropagateCDs) record.Event(cm, event.Warning(reasonPropagate, err)) status.MarkConditions(xpv1.ReconcileError(err)) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } if propagated { cm.SetConnectionDetailsLastPublishedTime(&metav1.Time{Time: time.Now()}) record.Event(cm, event.Normal(reasonPropagate, "Successfully propagated connection details from composite resource")) } // We have a watch on both the claim and its composite, so there's no // need to requeue here. status.MarkConditions(xpv1.Available()) return reconcile.Result{Requeue: false}, errors.Wrap(r.client.Status().Update(ctx, cm), errUpdateClaimStatus) } // Waiting returns a condition that indicates the composite resource claim is // currently waiting for its composite resource to become ready. func Waiting() xpv1.Condition { return xpv1.Condition{ Type: xpv1.TypeReady, Status: corev1.ConditionFalse, LastTransitionTime: metav1.Now(), Reason: xpv1.ConditionReason("Waiting"), Message: "Claim is waiting for composite resource to become Ready", } }
/* Copyright 2024 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package claim import ( "context" "fmt" "dario.cat/mergo" "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/resource" v1 "github.com/crossplane/crossplane/apis/apiextensions/v1" "github.com/crossplane/crossplane/internal/names" "github.com/crossplane/crossplane/internal/xcrd" "github.com/crossplane/crossplane/internal/xresource/unstructured/claim" "github.com/crossplane/crossplane/internal/xresource/unstructured/composite" ) const ( errUpdateClaim = "cannot update claim" errUnsupportedClaimSpec = "claim spec was not an object" errGenerateName = "cannot generate a name for composite resource" errApplyComposite = "cannot apply composite resource" errMergeClaimSpec = "unable to merge claim spec" errMergeClaimStatus = "unable to merge claim status" ) // A ClientSideCompositeSyncer binds and syncs a claim with a composite resource // (XR). It uses client-side apply to update the claim and the composite. type ClientSideCompositeSyncer struct { client resource.ClientApplicator names names.NameGenerator } // NewClientSideCompositeSyncer returns a CompositeSyncer that uses client-side // apply to sync a claim with a composite resource. func NewClientSideCompositeSyncer(c client.Client, ng names.NameGenerator) *ClientSideCompositeSyncer { return &ClientSideCompositeSyncer{ client: resource.ClientApplicator{ Client: c, Applicator: resource.NewAPIPatchingApplicator(c), }, names: ng, } } // Sync the supplied claim with the supplied composite resource (XR). Syncing // may involve creating and binding the XR. func (s *ClientSideCompositeSyncer) Sync(ctx context.Context, cm *claim.Unstructured, xr *composite.Unstructured) error { // First we sync claim -> XR. // It's possible we're being asked to configure a statically provisioned XR. // We should respect its existing external name annotation. en := meta.GetExternalName(xr) // Do not propagate *.kubernetes.io annotations/labels from claim to XR. For // example kubectl.kubernetes.io/last-applied-configuration should not be // propagated. // See https://kubernetes.io/docs/reference/labels-annotations-taints/ // for all annotations and their semantic meta.AddAnnotations(xr, withoutReservedK8sEntries(cm.GetAnnotations())) meta.AddLabels(xr, withoutReservedK8sEntries(cm.GetLabels())) meta.AddLabels(xr, map[string]string{ xcrd.LabelKeyClaimName: cm.GetName(), xcrd.LabelKeyClaimNamespace: cm.GetNamespace(), }) // If the bound XR already exists we want to restore its original external // name annotation in order to ensure we don't try to rename anything after // the fact. if meta.WasCreated(xr) && en != "" { meta.SetExternalName(xr, en) } // We want to propagate the claim's spec to the composite's spec, but first // we must filter out any well-known fields that are unique to claims. We do // this by: // 1. Grabbing a map whose keys represent all well-known claim fields. // 2. Deleting any well-known fields that we want to propagate. // 3. Using the resulting map keys to filter the claim's spec. wellKnownClaimFields := xcrd.CompositeResourceClaimSpecProps(nil) for _, field := range xcrd.PropagateSpecProps { delete(wellKnownClaimFields, field) } // CompositionRevisionRef is a special field which needs to be propagated // based on the Update policy. If the policy is `Manual`, we need to remove // CompositionRevisionRef from wellKnownClaimFields, so it is propagated // from the claim to the XR. if xr.GetCompositionUpdatePolicy() != nil && *xr.GetCompositionUpdatePolicy() == xpv1.UpdateManual { delete(wellKnownClaimFields, xcrd.CompositionRevisionRef) } cmSpec, ok := cm.Object["spec"].(map[string]any) if !ok { return errors.New(errUnsupportedClaimSpec) } // Propagate the claim's spec (minus well known fields) to the XR's spec. xr.Object["spec"] = withoutKeys(cmSpec, xcrd.GetPropFields(wellKnownClaimFields)...) // We overwrite the entire XR spec above, so we wait until this point to set // the claim reference. xr.SetClaimReference(cm.GetReference()) // If the claim references an XR, make sure we're going to apply that XR. We // do this just in case the XR exists, but we couldn't get it due to a stale // cache. if ref := cm.GetResourceReference(); ref != nil { xr.SetName(ref.Name) } // If the XR doesn't exist, derive a name from the claim. The generated name // is likely (but not guaranteed) to be available when we create the XR. If // taken, then we are going to update an existing XR, probably hijacking it // from another claim. if !meta.WasCreated(xr) { xr.SetGenerateName(fmt.Sprintf("%s-", cm.GetName())) // GenerateName is a no-op if the xr already has a name set. if err := s.names.GenerateName(ctx, xr); err != nil { return errors.Wrap(err, errGenerateName) } } // We're now done syncing the XR from the claim. If this is a new XR it's // important that we update the claim to reference it before we create it. // This ensures we don't leak an XR. We could leak an XR if we created an XR // then crashed before saving a reference to it. We'd create another XR on // the next reconcile. existing := cm.GetResourceReference() proposed := xr.GetReference() if !cmp.Equal(existing, proposed) { cm.SetResourceReference(proposed) if err := s.client.Update(ctx, cm); err != nil { return errors.Wrap(err, errUpdateClaim) } } // Apply the XR, unless it's a no-op change. err := s.client.Apply(ctx, xr, resource.AllowUpdateIf(func(old, obj runtime.Object) bool { return !cmp.Equal(old, obj) })) if err := resource.Ignore(resource.IsNotAllowed, err); err != nil { return errors.Wrap(err, errApplyComposite) } // Below this point we're syncing XR status -> claim status. // Merge the XR's status into the claim's status. if err := merge(cm.Object["status"], xr.Object["status"], // XR status fields overwrite non-empty claim fields. withMergeOptions(mergo.WithOverride), // Don't sync XR machinery (i.e. status conditions, connection details). withSrcFilter(xcrd.GetPropFields(xcrd.CompositeResourceStatusProps(v1.CompositeResourceScopeLegacyCluster))...)); err != nil { return errors.Wrap(err, errMergeClaimStatus) } if err := s.client.Status().Update(ctx, cm); err != nil { return errors.Wrap(err, errUpdateClaimStatus) } // Propagate the actual external name back from the XR to the claim if it's // set. The name we're propagating here will may be a name the XR must // enforce (i.e. overriding any requested by the claim) but will often // actually just be propagating back a name that was already propagated // forward from the claim to the XR earlier in this method. if en := meta.GetExternalName(xr); en != "" { meta.SetExternalName(cm, en) } // We want to propagate the XR's spec to the claim's spec, but first we must // filter out any well-known fields that are unique to XR. We do this by: // 1. Grabbing a map whose keys represent all well-known XR fields. // 2. Deleting any well-known fields that we want to propagate. // 3. Filtering OUT the remaining map keys from the XR's spec so that we end // up adding only the well-known fields to the claim's spec. wellKnownXRFields := xcrd.CompositeResourceSpecProps(v1.CompositeResourceScopeLegacyCluster, nil) for _, field := range xcrd.PropagateSpecProps { delete(wellKnownXRFields, field) } // CompositionRevisionRef is a special field which needs to be propagated // based on the Update policy. If the policy is `Automatic`, we need to // overwrite the claim's value with the XR's which should be the // `currentRevision` if xr.GetCompositionUpdatePolicy() != nil && *xr.GetCompositionUpdatePolicy() == xpv1.UpdateAutomatic { cm.SetCompositionRevisionReference(xr.GetCompositionRevisionReference()) } // Propagate the XR's spec (minus well known fields) to the claim's spec. if err := merge(cm.Object["spec"], xr.Object["spec"], withSrcFilter(xcrd.GetPropFields(wellKnownXRFields)...)); err != nil { return errors.Wrap(err, errMergeClaimSpec) } return errors.Wrap(s.client.Update(ctx, cm), errUpdateClaim) }
/* Copyright 2024 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package claim import ( "context" "fmt" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/fieldpath" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/resource" v1 "github.com/crossplane/crossplane/apis/apiextensions/v1" "github.com/crossplane/crossplane/internal/names" "github.com/crossplane/crossplane/internal/xcrd" "github.com/crossplane/crossplane/internal/xresource/unstructured/claim" "github.com/crossplane/crossplane/internal/xresource/unstructured/composite" ) // Error strings. const ( errCreatePatch = "cannot create patch" errPatchFieldManagers = "cannot patch field managers" errUnsupportedCompositeStatus = "composite resource status was not an object" ) // Server-side-apply field owners. const ( // FieldOwnerXR owns the fields this controller mutates on composite // resources (XRs). FieldOwnerXR = "apiextensions.crossplane.io/claim" ) // A NopManagedFieldsUpgrader does nothing. type NopManagedFieldsUpgrader struct{} // Upgrade does nothing. func (u *NopManagedFieldsUpgrader) Upgrade(_ context.Context, _ client.Object, _ string) error { return nil } // A PatchingManagedFieldsUpgrader uses a JSON patch to upgrade an object's // managed fields from client-side to server-side apply. The upgrade is a no-op // if the object does not need upgrading. type PatchingManagedFieldsUpgrader struct { client client.Writer } // NewPatchingManagedFieldsUpgrader returns a ManagedFieldsUpgrader that uses a // JSON patch to upgrade and object's managed fields from client-side to // server-side apply. func NewPatchingManagedFieldsUpgrader(w client.Writer) *PatchingManagedFieldsUpgrader { return &PatchingManagedFieldsUpgrader{client: w} } // Upgrade the supplied object's field managers from client-side to server-side // apply. // // This is a multi-step process. // // Step 1: All fields are owned by either manager 'crossplane', operation // 'Update' or manager 'apiextensions.crossplane.io/composite', operation // 'Apply'. This represents all fields set by the claim or XR controller up to // this point. // // Step 2: Upgrade is called for the first time. We delete all field managers. // // Step 3: The claim controller server-side applies its fully specified intent // as field manager 'apiextensions.crossplane.io/claim'. This becomes the // manager of all the fields that are part of the claim controller's fully // specified intent. All existing fields the claim controller didn't specify // become owned by a special manager - 'before-first-apply', operation 'Update'. // // Step 4: Upgrade is called for the second time. It deletes the // 'before-first-apply' field manager entry. Only the claim field manager // remains. // // Step 5: Eventually the XR reconciler updates a field (e.g. spec.resourceRefs) // and becomes owner of that field. func (u *PatchingManagedFieldsUpgrader) Upgrade(ctx context.Context, obj client.Object, ssaManager string) error { // The XR doesn't exist, nothing to upgrade. if !meta.WasCreated(obj) { return nil } foundSSA := false foundBFA := false idxBFA := -1 for i, e := range obj.GetManagedFields() { if e.Manager == ssaManager { foundSSA = true } if e.Manager == "before-first-apply" { foundBFA = true idxBFA = i } } switch { // If our SSA field manager exists and the before-first-apply field manager // doesn't, we've already done the upgrade. Don't do it again. case foundSSA && !foundBFA: return nil // We found our SSA field manager but also before-first-apply. It should now // be safe to delete before-first-apply. case foundSSA && foundBFA: p := []byte(fmt.Sprintf(`[ {"op":"remove","path":"/metadata/managedFields/%d"}, {"op":"replace","path":"/metadata/resourceVersion","value":"%s"} ]`, idxBFA, obj.GetResourceVersion())) return errors.Wrap(resource.IgnoreNotFound(u.client.Patch(ctx, obj, client.RawPatch(types.JSONPatchType, p))), "cannot remove before-first-apply from field managers") // We didn't find our SSA field manager or the before-first-apply field // manager. This means we haven't started the upgrade. The first thing we // want to do is clear all managed fields. After we do this we'll let our // SSA field manager apply the fields it cares about. The result will be // that our SSA field manager shares ownership with a new manager named // 'before-first-apply'. default: p := []byte(fmt.Sprintf(`[ {"op":"replace","path": "/metadata/managedFields","value": [{}]}, {"op":"replace","path":"/metadata/resourceVersion","value":"%s"} ]`, obj.GetResourceVersion())) return errors.Wrap(resource.IgnoreNotFound(u.client.Patch(ctx, obj, client.RawPatch(types.JSONPatchType, p))), "cannot clear field managers") } } // A ServerSideCompositeSyncer binds and syncs a claim with a composite resource // (XR). It uses server-side apply to update the XR. type ServerSideCompositeSyncer struct { client client.Client names names.NameGenerator } // NewServerSideCompositeSyncer returns a CompositeSyncer that uses server-side // apply to sync a claim with a composite resource. func NewServerSideCompositeSyncer(c client.Client, ng names.NameGenerator) *ServerSideCompositeSyncer { return &ServerSideCompositeSyncer{client: c, names: ng} } // Sync the supplied claim with the supplied composite resource (XR). Syncing // may involve creating and binding the XR. func (s *ServerSideCompositeSyncer) Sync(ctx context.Context, cm *claim.Unstructured, xr *composite.Unstructured) error { // First we sync claim -> XR. // Create an empty XR patch object. We'll use this object to ensure we only // SSA our desired state, not the state we previously read from the API // server. xrPatch := composite.New(composite.WithGroupVersionKind(xr.GroupVersionKind()), composite.WithSchema(composite.SchemaLegacy)) // If the claim references an XR, make sure we're going to apply that XR. We // do this instead of using the supplied XR's name just in case the XR // exists, but we couldn't get it due to a stale cache. if ref := cm.GetResourceReference(); ref != nil { xrPatch.SetName(ref.Name) } // If the XR doesn't have a name (i.e. doesn't exist), derive a name from // the claim. The generated name is likely (but not guaranteed) to be // available when we create the XR. If taken, then we are going to update an // existing XR, probably hijacking it from another claim. if xrPatch.GetName() == "" { xrPatch.SetGenerateName(fmt.Sprintf("%s-", cm.GetName())) if err := s.names.GenerateName(ctx, xrPatch); err != nil { return errors.Wrap(err, errGenerateName) } } // It's possible we're being asked to configure a statically provisioned XR. // We should respect its existing external name annotation. en := meta.GetExternalName(xr) // Do not propagate *.kubernetes.io annotations/labels from claim to XR. For // example kubectl.kubernetes.io/last-applied-configuration should not be // propagated. // See https://kubernetes.io/docs/reference/labels-annotations-taints/ // for all annotations and their semantic if ann := withoutReservedK8sEntries(cm.GetAnnotations()); len(ann) > 0 { meta.AddAnnotations(xrPatch, withoutReservedK8sEntries(cm.GetAnnotations())) } meta.AddLabels(xrPatch, withoutReservedK8sEntries(cm.GetLabels())) meta.AddLabels(xrPatch, map[string]string{ xcrd.LabelKeyClaimName: cm.GetName(), xcrd.LabelKeyClaimNamespace: cm.GetNamespace(), }) // Restore the XR's original external name annotation in order to ensure we // don't try to rename anything after the fact. if en != "" { meta.SetExternalName(xrPatch, en) } // We want to propagate the claim's spec to the composite's spec, but first // we must filter out any well-known fields that are unique to claims. We do // this by: // 1. Grabbing a map whose keys represent all well-known claim fields. // 2. Deleting any well-known fields that we want to propagate. // 3. Using the resulting map keys to filter the claim's spec. wellKnownClaimFields := xcrd.CompositeResourceClaimSpecProps(nil) for _, field := range xcrd.PropagateSpecProps { delete(wellKnownClaimFields, field) } // Propagate composition revision ref from the claim if the update policy is // manual. When the update policy is manual the claim controller is // authoritative for this field. See below for the automatic case. if xr.GetCompositionUpdatePolicy() != nil && *xr.GetCompositionUpdatePolicy() == xpv1.UpdateManual { delete(wellKnownClaimFields, xcrd.CompositionRevisionRef) } cmSpec, ok := cm.Object["spec"].(map[string]any) if !ok { return errors.New(errUnsupportedClaimSpec) } // Propagate the claim's spec (minus well known fields) to the XR's spec. xrPatch.Object["spec"] = withoutKeys(cmSpec, xcrd.GetPropFields(wellKnownClaimFields)...) // We overwrite the entire XR spec above, so we wait until this point to set // the claim reference. xrPatch.SetClaimReference(cm.GetReference()) // Below this point we're syncing XR -> claim. // Bind the claim to the XR. If this is a new XR it's important that we // apply the claim before we create it. This ensures we don't leak an XR. We // could leak an XR if we created an XR then crashed before saving a // reference to it. We'd create another XR on the next reconcile. cm.SetResourceReference(xrPatch.GetReference()) // Propagate the actual external name back from the composite to the // claim if it's set. The name we're propagating here will may be a name // the XR must enforce (i.e. overriding any requested by the claim) but // will often actually just be propagating back a name that was already // propagated forward from the claim to the XR during the // preceding configure phase. if en := meta.GetExternalName(xr); en != "" { meta.SetExternalName(cm, en) } // Propagate composition ref from the XR if the claim doesn't have an // opinion. Composition and revision selectors only propagate from claim -> // XR. When a claim has selectors **and no reference** the flow should be: // // 1. Claim controller propagates selectors claim -> XR. // 2. XR controller uses selectors to set XR's composition ref. // 3. Claim controller propagates ref XR -> claim. // // When a claim sets a composition ref, it supercedes selectors. It should // only be propagated claim -> XR. if ref := xr.GetCompositionReference(); ref != nil && cm.GetCompositionReference() == nil { cm.SetCompositionReference(ref) } // Propagate composition revision ref from the XR if the update policy is // automatic. When the update policy is automatic the XR controller is // authoritative for this field. It will update the XR's ref as new // revisions become available, and we want to propgate the ref XR -> claim. if p := xr.GetCompositionUpdatePolicy(); p != nil && *p == xpv1.UpdateAutomatic && xr.GetCompositionRevisionReference() != nil { cm.SetCompositionRevisionReference(xr.GetCompositionRevisionReference()) } // It's important that we update the claim before we apply the XR, to make // sure the claim's resourceRef points to the XR before we create the XR. // Otherwise we risk leaking an XR. // // It's also important that the API server will reject this update if we're // reconciling an old claim, e.g. due to a stale cache. It's possible that // we're seeing an old version without a resourceRef set, but in reality an // XR has already been created. We don't want to leak it and create another. if err := s.client.Update(ctx, cm); err != nil { return errors.Wrap(err, errUpdateClaim) } if err := s.client.Patch(ctx, xrPatch, client.Apply, client.ForceOwnership, client.FieldOwner(FieldOwnerXR)); err != nil { return errors.Wrap(err, errApplyComposite) } // Update the XR passed to this method to reflect the state returned by the // API server when we patched it. xr.Object = xrPatch.Object m, ok := xr.Object["status"] if !ok { // If the XR doesn't have a status yet there's nothing else to sync. // Just update the claim passed to this method to reflect the state // returned by the API server when we patched it. return nil } xrStatus, ok := m.(map[string]any) if !ok { return errors.New(errUnsupportedCompositeStatus) } // Preserve Crossplane machinery, like status conditions. cmcs := xpv1.ConditionedStatus{} _ = fieldpath.Pave(cm.Object).GetValueInto("status", &cmcs) pub := cm.GetConnectionDetailsLastPublishedTime() // Update the claim's user-defined status fields to match the XRs. cm.Object["status"] = withoutKeys(xrStatus, xcrd.GetPropFields(xcrd.CompositeResourceStatusProps(v1.CompositeResourceScopeLegacyCluster))...) if cmcs.Conditions != nil { cm.SetConditions(cmcs.Conditions...) } if pub != nil { cm.SetConnectionDetailsLastPublishedTime(pub) } return errors.Wrap(s.client.Status().Update(ctx, cm), errUpdateClaimStatus) }
/* Copyright 2023 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package composition import ( "testing" fuzz "github.com/AdaLogics/go-fuzz-headers" v1 "github.com/crossplane/crossplane/apis/apiextensions/v1" ) func FuzzNewCompositionRevision(f *testing.F) { f.Fuzz(func(_ *testing.T, data []byte) { f := fuzz.NewConsumer(data) c := &v1.Composition{} f.GenerateStruct(c) revision, err := f.GetInt() if err != nil { return } _ = NewCompositionRevision(c, int64(revision)) }) }
/* Copyright 2021 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package composition creates composition revisions. package composition import ( "context" "strconv" "strings" "time" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane/crossplane-runtime/pkg/resource" v1 "github.com/crossplane/crossplane/apis/apiextensions/v1" "github.com/crossplane/crossplane/internal/controller/apiextensions/controller" ) const ( timeout = 2 * time.Minute ) // Error strings. const ( errGet = "cannot get Composition" errListRevs = "cannot list CompositionRevisions" errCreateRev = "cannot create CompositionRevision" errOwnRev = "cannot own CompositionRevision" errUpdateRevStatus = "cannot update CompositionRevision status" errUpdateRevSpec = "cannot update CompositionRevision spec" ) // Event reasons. const ( reasonCreateRev event.Reason = "CreateRevision" reasonUpdateRev event.Reason = "UpdateRevision" ) // Setup adds a controller that reconciles Compositions by creating new // CompositionRevisions for each revision of the Composition's spec. func Setup(mgr ctrl.Manager, o controller.Options) error { name := "revisions/" + strings.ToLower(v1.CompositionGroupKind) r := NewReconciler(mgr, WithLogger(o.Logger.WithValues("controller", name)), WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name)))) return ctrl.NewControllerManagedBy(mgr). Named(name). For(&v1.Composition{}). Owns(&v1.CompositionRevision{}). WithOptions(o.ForControllerRuntime()). Complete(ratelimiter.NewReconciler(name, errors.WithSilentRequeueOnConflict(r), o.GlobalRateLimiter)) } // ReconcilerOption is used to configure the Reconciler. type ReconcilerOption func(*Reconciler) // WithLogger specifies how the Reconciler should log messages. func WithLogger(log logging.Logger) ReconcilerOption { return func(r *Reconciler) { r.log = log } } // WithRecorder specifies how the Reconciler should record Kubernetes events. func WithRecorder(er event.Recorder) ReconcilerOption { return func(r *Reconciler) { r.record = er } } // NewReconciler returns a Reconciler of Compositions. func NewReconciler(mgr manager.Manager, opts ...ReconcilerOption) *Reconciler { r := &Reconciler{ client: mgr.GetClient(), log: logging.NewNopLogger(), record: event.NewNopRecorder(), } for _, f := range opts { f(r) } return r } // A Reconciler reconciles Compositions by creating new CompositionRevisions for // each revision of the Composition's spec. type Reconciler struct { client client.Client log logging.Logger record event.Recorder } // Reconcile a Composition. func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := r.log.WithValues("request", req) log.Debug("Reconciling") ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() comp := &v1.Composition{} if err := r.client.Get(ctx, req.NamespacedName, comp); err != nil { log.Debug(errGet, "error", err) r.record.Event(comp, event.Warning(reasonCreateRev, errors.Wrap(err, errGet))) return reconcile.Result{}, errors.Wrap(resource.IgnoreNotFound(err), errGet) } if meta.WasDeleted(comp) { return reconcile.Result{}, nil } currentHash := comp.Hash() log = log.WithValues( "uid", comp.GetUID(), "version", comp.GetResourceVersion(), "name", comp.GetName(), "spec-hash", currentHash, ) rl := &v1.CompositionRevisionList{} if err := r.client.List(ctx, rl, client.MatchingLabels{v1.LabelCompositionName: comp.GetName()}); err != nil { log.Debug(errListRevs, "error", err) r.record.Event(comp, event.Warning(reasonCreateRev, errors.Wrap(err, errListRevs))) return reconcile.Result{}, errors.Wrap(err, errListRevs) } var latestRev, existingRev int64 if lr := v1.LatestRevision(comp, rl.Items); lr != nil { latestRev = lr.Spec.Revision } for i := range rl.Items { rev := &rl.Items[i] if !metav1.IsControlledBy(rev, comp) { // We already listed revisions with Composition name label pointing // to this Composition. Let's make sure they are controlled by it. // Note(turkenh): Owner references are stripped out when a resource // is moved from one cluster to another (i.e. backup/restore) since // the UID of the owner is not preserved. We need to make sure to // re-add the owner reference to all revisions of this Composition. if err := meta.AddControllerReference(rev, meta.AsController(meta.TypedReferenceTo(comp, v1.CompositionGroupVersionKind))); err != nil { log.Debug(errOwnRev, "error", err) r.record.Event(comp, event.Warning(reasonUpdateRev, err)) return reconcile.Result{}, errors.Wrap(err, errOwnRev) } if err := r.client.Update(ctx, rev); err != nil { log.Debug(errOwnRev, "error", err) r.record.Event(comp, event.Warning(reasonUpdateRev, err)) return reconcile.Result{}, errors.Wrap(err, errOwnRev) } } // This revision does not match our current Composition. if rev.GetLabels()[v1.LabelCompositionHash] != currentHash[:63] { continue } // This revision matches our current Composition. We don't need a new one. existingRev = rev.Spec.Revision // This revision has the highest revision number - it doesn't need updating. if rev.Spec.Revision == latestRev { continue } // This revision does not have the highest revision number. Update it so that it does. rev.Spec.Revision = latestRev + 1 if err := r.client.Update(ctx, rev); err != nil { log.Debug(errUpdateRevSpec, "error", err) if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } r.record.Event(comp, event.Warning(reasonUpdateRev, err)) return reconcile.Result{}, errors.Wrap(err, errUpdateRevSpec) } } // We start from revision 1, so 0 indicates we didn't find one. if existingRev > 0 { log.Debug("No new revision needed.", "current-revision", existingRev) return reconcile.Result{}, nil } if err := r.client.Create(ctx, NewCompositionRevision(comp, latestRev+1)); err != nil { log.Debug(errCreateRev, "error", err) r.record.Event(comp, event.Warning(reasonCreateRev, err)) return reconcile.Result{}, errors.Wrap(err, errCreateRev) } log.Debug("Created new revision", "revision", latestRev+1) r.record.Event(comp, event.Normal(reasonCreateRev, "Created new revision", "revision", strconv.FormatInt(latestRev+1, 10))) return reconcile.Result{}, nil }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package composition import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/crossplane/crossplane-runtime/pkg/meta" v1 "github.com/crossplane/crossplane/apis/apiextensions/v1" ) // NewCompositionRevision creates a new revision of the supplied Composition. func NewCompositionRevision(c *v1.Composition, revision int64) *v1.CompositionRevision { hash := c.Hash() if len(hash) >= 63 { hash = hash[0:63] } nameSuffix := hash if len(nameSuffix) >= 7 { nameSuffix = nameSuffix[0:7] } cr := &v1.CompositionRevision{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", c.GetName(), nameSuffix), Labels: map[string]string{ v1.LabelCompositionName: c.GetName(), // We cannot have a label value longer than 63 chars // https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set v1.LabelCompositionHash: hash, }, }, Spec: NewCompositionRevisionSpec(c.Spec, revision), } ref := meta.TypedReferenceTo(c, v1.CompositionGroupVersionKind) meta.AddOwnerReference(cr, meta.AsController(ref)) for k, v := range c.GetLabels() { cr.ObjectMeta.Labels[k] = v } return cr } // NewCompositionRevisionSpec translates a composition's spec to a composition // revision spec. func NewCompositionRevisionSpec(cs v1.CompositionSpec, revision int64) v1.CompositionRevisionSpec { conv := v1.GeneratedRevisionSpecConverter{} rs := conv.ToRevisionSpec(cs) rs.Revision = revision return rs }
/* Copyright 2023 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package manager import ( "context" "testing" fuzz "github.com/AdaLogics/go-fuzz-headers" "github.com/crossplane/crossplane-runtime/pkg/errors" v1 "github.com/crossplane/crossplane/apis/pkg/v1" "github.com/crossplane/crossplane/internal/xpkg" "github.com/crossplane/crossplane/internal/xpkg/fake" ) func FuzzPackageRevision(f *testing.F) { f.Fuzz(func(t *testing.T, data []byte) { ff := fuzz.NewConsumer(data) pkg := &v1.Provider{} ff.GenerateStruct(pkg) fetcher := &fake.MockFetcher{ MockHead: fake.NewMockHeadFn(nil, errors.New("boom")), } r := NewPackageRevisioner(fetcher) _, _ = r.Revision(context.Background(), pkg, "") n, err := ff.GetString() if err != nil { t.Skip() } h, err := ff.GetString() if err != nil { t.Skip() } _ = xpkg.FriendlyID(n, h) }) }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package manager implements the Crossplane Package controllers. package manager import ( "context" "fmt" "math" "reflect" "strings" "time" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/conditions" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane/crossplane-runtime/pkg/resource" v1 "github.com/crossplane/crossplane/apis/pkg/v1" "github.com/crossplane/crossplane/apis/pkg/v1beta1" "github.com/crossplane/crossplane/internal/controller/pkg/controller" "github.com/crossplane/crossplane/internal/xpkg" ) const ( reconcileTimeout = 1 * time.Minute // pullWait is the time after which the package manager will check for // updated content for the given package reference. This behavior is only // enabled when the packagePullPolicy is Always. pullWait = 1 * time.Minute reconcilePausedMsg = "Reconciliation (including deletion) is paused via the pause annotation" ) func pullBasedRequeue(p *corev1.PullPolicy) reconcile.Result { if p != nil && *p == corev1.PullAlways { return reconcile.Result{RequeueAfter: pullWait} } return reconcile.Result{Requeue: false} } const ( errGetPackage = "cannot get package" errListRevisions = "cannot list revisions for package" errUnpack = "cannot unpack package" errApplyPackageRevision = "cannot apply package revision" errGCPackageRevision = "cannot garbage collect old package revision" errGetPullConfig = "cannot get image pull secret from config" errRewriteImage = "cannot rewrite image path using config" errUpdateStatus = "cannot update package status" errUpdateInactivePackageRevision = "cannot update inactive package revision" errCreateK8sClient = "failed to initialize clientset" errBuildFetcher = "cannot build fetcher" ) // Event reasons. const ( reasonList event.Reason = "ListRevision" reasonUnpack event.Reason = "UnpackPackage" reasonTransitionRevision event.Reason = "TransitionRevision" reasonGarbageCollect event.Reason = "GarbageCollect" reasonInstall event.Reason = "InstallPackageRevision" reasonPaused event.Reason = "ReconciliationPaused" reasonImageConfig event.Reason = "ImageConfigSelection" ) // ReconcilerOption is used to configure the Reconciler. type ReconcilerOption func(*Reconciler) // WithNewPackageFn determines the type of package being reconciled. func WithNewPackageFn(f func() v1.Package) ReconcilerOption { return func(r *Reconciler) { r.newPackage = f } } // WithNewPackageRevisionFn determines the type of package being reconciled. func WithNewPackageRevisionFn(f func() v1.PackageRevision) ReconcilerOption { return func(r *Reconciler) { r.newPackageRevision = f } } // WithNewPackageRevisionListFn determines the type of package being reconciled. func WithNewPackageRevisionListFn(f func() v1.PackageRevisionList) ReconcilerOption { return func(r *Reconciler) { r.newPackageRevisionList = f } } // WithRevisioner specifies how the Reconciler should acquire a package image's // revision name. func WithRevisioner(d Revisioner) ReconcilerOption { return func(r *Reconciler) { r.pkg = d } } // WithConfigStore specifies the image config store to use. func WithConfigStore(c xpkg.ConfigStore) ReconcilerOption { return func(r *Reconciler) { r.config = c } } // WithLogger specifies how the Reconciler should log messages. func WithLogger(log logging.Logger) ReconcilerOption { return func(r *Reconciler) { r.log = log } } // WithRecorder specifies how the Reconciler should record Kubernetes events. func WithRecorder(er event.Recorder) ReconcilerOption { return func(r *Reconciler) { r.record = er } } // Reconciler reconciles packages. type Reconciler struct { client resource.ClientApplicator pkg Revisioner config xpkg.ConfigStore log logging.Logger record event.Recorder conditions conditions.Manager newPackage func() v1.Package newPackageRevision func() v1.PackageRevision newPackageRevisionList func() v1.PackageRevisionList } // SetupProvider adds a controller that reconciles Providers. func SetupProvider(mgr ctrl.Manager, o controller.Options) error { name := "packages/" + strings.ToLower(v1.ProviderGroupKind) np := func() v1.Package { return &v1.Provider{} } nr := func() v1.PackageRevision { return &v1.ProviderRevision{} } nrl := func() v1.PackageRevisionList { return &v1.ProviderRevisionList{} } cs, err := kubernetes.NewForConfig(mgr.GetConfig()) if err != nil { return errors.Wrap(err, errCreateK8sClient) } f, err := xpkg.NewK8sFetcher(cs, append(o.FetcherOptions, xpkg.WithNamespace(o.Namespace), xpkg.WithServiceAccount(o.ServiceAccount))...) if err != nil { return errors.Wrap(err, errBuildFetcher) } log := o.Logger.WithValues("controller", name) opts := []ReconcilerOption{ WithNewPackageFn(np), WithNewPackageRevisionFn(nr), WithNewPackageRevisionListFn(nrl), WithRevisioner(NewPackageRevisioner(f, WithDefaultRegistry(o.DefaultRegistry))), WithConfigStore(xpkg.NewImageConfigStore(mgr.GetClient(), o.Namespace)), WithLogger(log), WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), } return ctrl.NewControllerManagedBy(mgr). Named(name). For(&v1.Provider{}). Owns(&v1.ProviderRevision{}). Watches(&v1beta1.ImageConfig{}, enqueueProvidersForImageConfig(mgr.GetClient(), log)). WithOptions(o.ForControllerRuntime()). Complete(ratelimiter.NewReconciler(name, errors.WithSilentRequeueOnConflict(NewReconciler(mgr, opts...)), o.GlobalRateLimiter)) } // SetupConfiguration adds a controller that reconciles Configurations. func SetupConfiguration(mgr ctrl.Manager, o controller.Options) error { name := "packages/" + strings.ToLower(v1.ConfigurationGroupKind) np := func() v1.Package { return &v1.Configuration{} } nr := func() v1.PackageRevision { return &v1.ConfigurationRevision{} } nrl := func() v1.PackageRevisionList { return &v1.ConfigurationRevisionList{} } clientset, err := kubernetes.NewForConfig(mgr.GetConfig()) if err != nil { return errors.Wrap(err, "failed to initialize clientset") } fetcher, err := xpkg.NewK8sFetcher(clientset, append(o.FetcherOptions, xpkg.WithNamespace(o.Namespace), xpkg.WithServiceAccount(o.ServiceAccount))...) if err != nil { return errors.Wrap(err, "cannot build fetcher") } log := o.Logger.WithValues("controller", name) r := NewReconciler(mgr, WithNewPackageFn(np), WithNewPackageRevisionFn(nr), WithNewPackageRevisionListFn(nrl), WithRevisioner(NewPackageRevisioner(fetcher, WithDefaultRegistry(o.DefaultRegistry))), WithConfigStore(xpkg.NewImageConfigStore(mgr.GetClient(), o.Namespace)), WithLogger(log), WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), ) return ctrl.NewControllerManagedBy(mgr). Named(name). For(&v1.Configuration{}). Owns(&v1.ConfigurationRevision{}). Watches(&v1beta1.ImageConfig{}, enqueueConfigurationsForImageConfig(mgr.GetClient(), log)). WithOptions(o.ForControllerRuntime()). Complete(ratelimiter.NewReconciler(name, errors.WithSilentRequeueOnConflict(r), o.GlobalRateLimiter)) } // SetupFunction adds a controller that reconciles Functions. func SetupFunction(mgr ctrl.Manager, o controller.Options) error { name := "packages/" + strings.ToLower(v1.FunctionGroupKind) np := func() v1.Package { return &v1.Function{} } nr := func() v1.PackageRevision { return &v1.FunctionRevision{} } nrl := func() v1.PackageRevisionList { return &v1.FunctionRevisionList{} } cs, err := kubernetes.NewForConfig(mgr.GetConfig()) if err != nil { return errors.Wrap(err, errCreateK8sClient) } f, err := xpkg.NewK8sFetcher(cs, append(o.FetcherOptions, xpkg.WithNamespace(o.Namespace), xpkg.WithServiceAccount(o.ServiceAccount))...) if err != nil { return errors.Wrap(err, errBuildFetcher) } log := o.Logger.WithValues("controller", name) opts := []ReconcilerOption{ WithNewPackageFn(np), WithNewPackageRevisionFn(nr), WithNewPackageRevisionListFn(nrl), WithRevisioner(NewPackageRevisioner(f, WithDefaultRegistry(o.DefaultRegistry))), WithConfigStore(xpkg.NewImageConfigStore(mgr.GetClient(), o.Namespace)), WithLogger(log), WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), } return ctrl.NewControllerManagedBy(mgr). Named(name). For(&v1.Function{}). Owns(&v1.FunctionRevision{}). Watches(&v1beta1.ImageConfig{}, enqueueFunctionsForImageConfig(mgr.GetClient(), log)). WithOptions(o.ForControllerRuntime()). Complete(ratelimiter.NewReconciler(name, errors.WithSilentRequeueOnConflict(NewReconciler(mgr, opts...)), o.GlobalRateLimiter)) } // NewReconciler creates a new package reconciler. func NewReconciler(mgr ctrl.Manager, opts ...ReconcilerOption) *Reconciler { r := &Reconciler{ client: resource.ClientApplicator{ Client: mgr.GetClient(), Applicator: resource.NewAPIPatchingApplicator(mgr.GetClient()), }, pkg: NewNopRevisioner(), log: logging.NewNopLogger(), record: event.NewNopRecorder(), conditions: conditions.ObservedGenerationPropagationManager{}, } for _, f := range opts { f(r) } return r } // Reconcile package. func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { //nolint:gocognit // Reconcilers are complex. Be wary of adding more. log := r.log.WithValues("request", req) log.Debug("Reconciling") ctx, cancel := context.WithTimeout(ctx, reconcileTimeout) defer cancel() p := r.newPackage() if err := r.client.Get(ctx, req.NamespacedName, p); err != nil { // There's no need to requeue if we no longer exist. Otherwise // we'll be requeued implicitly because we return an error. log.Debug(errGetPackage, "error", err) return reconcile.Result{}, errors.Wrap(resource.IgnoreNotFound(err), errGetPackage) } status := r.conditions.For(p) // Check the pause annotation and return if it has the value "true" // after logging, publishing an event and updating the SYNC status condition if meta.IsPaused(p) { r.record.Event(p, event.Normal(reasonPaused, reconcilePausedMsg)) status.MarkConditions(xpv1.ReconcilePaused().WithMessage(reconcilePausedMsg)) // If the pause annotation is removed, we will have a chance to reconcile again and resume // and if status update fails, we will reconcile again to retry to update the status return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, p), errUpdateStatus) } if c := p.GetCondition(xpv1.ReconcilePaused().Type); c.Reason == xpv1.ReconcilePaused().Reason { p.CleanConditions() // Persist the removal of conditions and return. We'll be requeued // with the updated status and resume reconciliation. return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, p), errUpdateStatus) } // Get existing package revisions. prs := r.newPackageRevisionList() if err := r.client.List(ctx, prs, client.MatchingLabels(map[string]string{v1.LabelParentPackage: p.GetName()})); resource.IgnoreNotFound(err) != nil { err = errors.Wrap(err, errListRevisions) r.record.Event(p, event.Warning(reasonList, err)) return reconcile.Result{}, err } // Rewrite the image path if necessary. We need to do this before looking // for pull secrets, since the rewritten path may use different secrets than // the original. imagePath := p.GetSource() rewriteConfigName, newPath, err := r.config.RewritePath(ctx, imagePath) if err != nil { err = errors.Wrap(err, errRewriteImage) p.SetConditions(v1.Unpacking().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, p) r.record.Event(p, event.Warning(reasonImageConfig, err)) return reconcile.Result{}, err } if newPath != "" { imagePath = newPath p.SetAppliedImageConfigRefs(v1.ImageConfigRef{ Name: rewriteConfigName, Reason: v1.ImageConfigReasonRewrite, }) } else { p.ClearAppliedImageConfigRef(v1.ImageConfigReasonRewrite) } p.SetResolvedSource(imagePath) pullSecretConfig, pullSecretFromConfig, err := r.config.PullSecretFor(ctx, p.GetResolvedSource()) if err != nil { err = errors.Wrap(err, errGetPullConfig) status.MarkConditions(v1.Unpacking().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, p) r.record.Event(p, event.Warning(reasonImageConfig, err)) return reconcile.Result{}, err } var secrets []string if pullSecretFromConfig != "" { secrets = append(secrets, pullSecretFromConfig) p.SetAppliedImageConfigRefs(v1.ImageConfigRef{ Name: pullSecretConfig, Reason: v1.ImageConfigReasonSetPullSecret, }) } else { p.ClearAppliedImageConfigRef(v1.ImageConfigReasonSetPullSecret) } revisionName, err := r.pkg.Revision(ctx, p, secrets...) if err != nil { err = errors.Wrap(err, errUnpack) status.MarkConditions(v1.Unpacking().WithMessage(err.Error())) r.record.Event(p, event.Warning(reasonUnpack, err)) if updateErr := r.client.Status().Update(ctx, p); updateErr != nil { return reconcile.Result{}, errors.Wrap(updateErr, errUpdateStatus) } return reconcile.Result{}, err } if revisionName == "" { status.MarkConditions(v1.Unpacking().WithMessage("Waiting for unpack to complete")) r.record.Event(p, event.Normal(reasonUnpack, "Waiting for unpack to complete")) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, p), errUpdateStatus) } // Set the current revision and identifier. p.SetCurrentRevision(revisionName) // Use the original source as the identifier, even if it was rewritten by // ImageConfig. The revisioning and dependency resolution logic are all // based on the original package sources, so it's important that we preserve // the original until it's time to actually pull an image. p.SetCurrentIdentifier(p.GetSource()) pr := r.newPackageRevision() maxRevision := int64(0) oldestRevision := int64(math.MaxInt64) oldestRevisionIndex := -1 revisions := prs.GetRevisions() // Check to see if revision already exists. for index, rev := range revisions { revisionNum := rev.GetRevision() // Set max revision to the highest numbered existing revision. if revisionNum > maxRevision { maxRevision = revisionNum } // Set oldest revision to the lowest numbered revision and // record its index. if revisionNum < oldestRevision { oldestRevision = revisionNum oldestRevisionIndex = index } // If revision name is same as current revision, then revision // already exists. if rev.GetName() == p.GetCurrentRevision() { pr = rev // Finish iterating through all revisions to make sure // all non-current revisions are inactive. continue } if rev.GetDesiredState() == v1.PackageRevisionActive { // If revision is not the current revision, set to // inactive. This should always be done, regardless of // the package's revision activation policy. rev.SetDesiredState(v1.PackageRevisionInactive) if err := r.client.Apply(ctx, rev, resource.MustBeControllableBy(p.GetUID())); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errUpdateInactivePackageRevision) r.record.Event(p, event.Warning(reasonTransitionRevision, err)) return reconcile.Result{}, err } } } // The current revision should always be the highest numbered revision. if pr.GetRevision() < maxRevision || maxRevision == 0 { pr.SetRevision(maxRevision + 1) } // Check to see if there are revisions eligible for garbage collection. if p.GetRevisionHistoryLimit() != nil && *p.GetRevisionHistoryLimit() != 0 && len(revisions) > (int(*p.GetRevisionHistoryLimit())+1) { gcRev := revisions[oldestRevisionIndex] // Find the oldest revision and delete it. if err := r.client.Delete(ctx, gcRev); err != nil { err = errors.Wrap(err, errGCPackageRevision) r.record.Event(p, event.Warning(reasonGarbageCollect, err)) return reconcile.Result{}, err } } health := v1.PackageHealth(pr) if health.Status == corev1.ConditionTrue && p.GetCondition(v1.TypeHealthy).Status != corev1.ConditionTrue { // NOTE(phisco): We don't want to spam the user with events if the // package is already healthy. r.record.Event(p, event.Normal(reasonInstall, "Successfully installed package revision")) } status.MarkConditions(health) if pr.GetUID() == "" && pullSecretConfig != "" { // We only record this event if the revision is new, as we don't want to // spam the user with events if the revision already exists. log.Debug("Selected pull secret from image config store", "image", p.GetResolvedSource(), "pullSecretConfig", pullSecretConfig, "pullSecret", pullSecretFromConfig, "rewriteConfig", rewriteConfigName) r.record.Event(p, event.Normal(reasonImageConfig, fmt.Sprintf("Selected pullSecret %q from ImageConfig %q for registry authentication", pullSecretFromConfig, pullSecretConfig))) } // Create the non-existent package revision. pr.SetName(revisionName) pr.SetLabels(map[string]string{v1.LabelParentPackage: p.GetName()}) // Use the original source; the revision reconciler will rewrite it if // needed. The revision reconciler also inserts packages into the dependency // manager's lock, which must use the original source to ensure dependency // packages have the expected names even when rewritten. pr.SetSource(p.GetSource()) pr.SetPackagePullPolicy(p.GetPackagePullPolicy()) pr.SetPackagePullSecrets(p.GetPackagePullSecrets()) pr.SetIgnoreCrossplaneConstraints(p.GetIgnoreCrossplaneConstraints()) pr.SetSkipDependencyResolution(p.GetSkipDependencyResolution()) pr.SetCommonLabels(p.GetCommonLabels()) pwr, pwok := p.(v1.PackageWithRuntime) prwr, prok := pr.(v1.PackageRevisionWithRuntime) if pwok && prok { prwr.SetRuntimeConfigRef(pwr.GetRuntimeConfigRef()) prwr.SetTLSServerSecretName(pwr.GetTLSServerSecretName()) prwr.SetTLSClientSecretName(pwr.GetTLSClientSecretName()) } // If the current revision is not active, and we have an automatic or // undefined activation policy, always activate. if pr.GetDesiredState() != v1.PackageRevisionActive && (p.GetActivationPolicy() == nil || *p.GetActivationPolicy() == v1.AutomaticActivation) { pr.SetDesiredState(v1.PackageRevisionActive) } controlRef := meta.AsController(meta.TypedReferenceTo(p, p.GetObjectKind().GroupVersionKind())) controlRef.BlockOwnerDeletion = ptr.To(true) meta.AddOwnerReference(pr, controlRef) if err := r.client.Apply(ctx, pr, resource.MustBeControllableBy(p.GetUID())); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errApplyPackageRevision) r.record.Event(p, event.Warning(reasonInstall, err)) return reconcile.Result{}, err } // Handle changes in labels same := reflect.DeepEqual(pr.GetCommonLabels(), p.GetCommonLabels()) if !same { pr.SetCommonLabels(p.GetCommonLabels()) if err := r.client.Update(ctx, pr); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errApplyPackageRevision) r.record.Event(p, event.Warning(reasonInstall, err)) return reconcile.Result{}, err } } status.MarkConditions(v1.Active()) // If current revision is still not active, the package is inactive. if pr.GetDesiredState() != v1.PackageRevisionActive { status.MarkConditions(v1.Inactive().WithMessage("Package is inactive")) } // NOTE(hasheddan): when the first package revision is created for a // package, the health of the package is not set until the revision reports // its health. If updating from an existing revision, the package health // will match the health of the old revision until the next reconcile. return pullBasedRequeue(p.GetPackagePullPolicy()), errors.Wrap(r.client.Status().Update(ctx, p), errUpdateStatus) } func enqueueProvidersForImageConfig(kube client.Client, log logging.Logger) handler.EventHandler { return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { ic, ok := o.(*v1beta1.ImageConfig) if !ok { return nil } // We only care about ImageConfigs that have a pull secret. if ic.Spec.Registry == nil || ic.Spec.Registry.Authentication == nil || ic.Spec.Registry.Authentication.PullSecretRef.Name == "" { return nil } // Enqueue all Providers matching the prefixes in the ImageConfig. l := &v1.ProviderList{} if err := kube.List(ctx, l); err != nil { // Nothing we can do, except logging, if we can't list Providers. log.Debug("Cannot list providers while attempting to enqueue from ImageConfig", "error", err) return nil } var matches []reconcile.Request for _, p := range l.Items { for _, m := range ic.Spec.MatchImages { if strings.HasPrefix(p.GetSource(), m.Prefix) || strings.HasPrefix(p.GetResolvedSource(), m.Prefix) { log.Debug("Enqueuing provider for image config", "provider", p.Name, "imageConfig", ic.Name) matches = append(matches, reconcile.Request{NamespacedName: types.NamespacedName{Name: p.Name}}) } } } return matches }) } func enqueueConfigurationsForImageConfig(kube client.Client, log logging.Logger) handler.EventHandler { return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { ic, ok := o.(*v1beta1.ImageConfig) if !ok { return nil } // We only care about ImageConfigs that have a pull secret. if ic.Spec.Registry == nil || ic.Spec.Registry.Authentication == nil || ic.Spec.Registry.Authentication.PullSecretRef.Name == "" { return nil } // Enqueue all Configurations matching the prefixes in the ImageConfig. l := &v1.ConfigurationList{} if err := kube.List(ctx, l); err != nil { // Nothing we can do, except logging, if we can't list Configurations. log.Debug("Cannot list configurations while attempting to enqueue from ImageConfig", "error", err) return nil } var matches []reconcile.Request for _, c := range l.Items { for _, m := range ic.Spec.MatchImages { if strings.HasPrefix(c.GetSource(), m.Prefix) || strings.HasPrefix(c.GetResolvedSource(), m.Prefix) { log.Debug("Enqueuing configuration for image config", "configuration", c.Name, "imageConfig", ic.Name) matches = append(matches, reconcile.Request{NamespacedName: types.NamespacedName{Name: c.Name}}) } } } return matches }) } func enqueueFunctionsForImageConfig(kube client.Client, log logging.Logger) handler.EventHandler { return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { ic, ok := o.(*v1beta1.ImageConfig) if !ok { return nil } // We only care about ImageConfigs that have a pull secret. if ic.Spec.Registry == nil || ic.Spec.Registry.Authentication == nil || ic.Spec.Registry.Authentication.PullSecretRef.Name == "" { return nil } // Enqueue all Functions matching the prefixes in the ImageConfig. l := &v1.FunctionList{} if err := kube.List(ctx, l); err != nil { // Nothing we can do, except logging, if we can't list Functions. log.Debug("Cannot list functions while attempting to enqueue from ImageConfig", "error", err) return nil } var matches []reconcile.Request for _, fn := range l.Items { for _, m := range ic.Spec.MatchImages { if strings.HasPrefix(fn.GetSource(), m.Prefix) || strings.HasPrefix(fn.GetResolvedSource(), m.Prefix) { log.Debug("Enqueuing function for image config", "function", fn.Name, "imageConfig", ic.Name) matches = append(matches, reconcile.Request{NamespacedName: types.NamespacedName{Name: fn.Name}}) } } } return matches }) }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package manager import ( "context" "github.com/google/go-containerregistry/pkg/name" corev1 "k8s.io/api/core/v1" "github.com/crossplane/crossplane-runtime/pkg/errors" v1 "github.com/crossplane/crossplane/apis/pkg/v1" "github.com/crossplane/crossplane/internal/xpkg" ) const ( errBadReference = "package tag is not a valid reference" errFetchPackage = "failed to fetch package digest from remote" ) // Revisioner extracts a revision name for a package source. type Revisioner interface { Revision(ctx context.Context, p v1.Package, extraPullSecrets ...string) (string, error) } // PackageRevisioner extracts a revision name for a package source. type PackageRevisioner struct { fetcher xpkg.Fetcher registry string } // A PackageRevisionerOption sets configuration for a package revisioner. type PackageRevisionerOption func(r *PackageRevisioner) // WithDefaultRegistry sets the default registry that a package revisioner will use. func WithDefaultRegistry(registry string) PackageRevisionerOption { return func(r *PackageRevisioner) { r.registry = registry } } // NewPackageRevisioner returns a new PackageRevisioner. func NewPackageRevisioner(fetcher xpkg.Fetcher, opts ...PackageRevisionerOption) *PackageRevisioner { r := &PackageRevisioner{ fetcher: fetcher, } for _, opt := range opts { opt(r) } return r } // Revision extracts a revision name for a package source. func (r *PackageRevisioner) Revision(ctx context.Context, p v1.Package, extraPullSecrets ...string) (string, error) { pullPolicy := p.GetPackagePullPolicy() if pullPolicy != nil && *pullPolicy == corev1.PullNever { return xpkg.FriendlyID(p.GetName(), p.GetSource()), nil } if pullPolicy != nil && *pullPolicy == corev1.PullIfNotPresent { if p.GetCurrentIdentifier() == p.GetSource() { return p.GetCurrentRevision(), nil } } // Use the package recorded in the status rather than the one in the spec, // since it may have been rewritten by image config. ref, err := name.ParseReference(p.GetResolvedSource(), name.WithDefaultRegistry(r.registry)) if err != nil { return "", errors.Wrap(err, errBadReference) } ps := v1.RefNames(p.GetPackagePullSecrets()) if len(extraPullSecrets) > 0 { ps = append(ps, extraPullSecrets...) } d, err := r.fetcher.Head(ctx, ref, ps...) if err != nil || d == nil { return "", errors.Wrap(err, errFetchPackage) } return xpkg.FriendlyID(p.GetName(), d.Digest.Hex), nil } // NopRevisioner returns an empty revision name. type NopRevisioner struct{} // NewNopRevisioner creates a NopRevisioner. func NewNopRevisioner() *NopRevisioner { return &NopRevisioner{} } // Revision returns an empty revision name and no error. func (d *NopRevisioner) Revision(context.Context, v1.Package, ...string) (string, error) { return "", nil }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package revision import ( "context" "fmt" "strings" "github.com/Masterminds/semver" "github.com/google/go-containerregistry/pkg/name" conregv1 "github.com/google/go-containerregistry/pkg/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/resource" pkgmetav1 "github.com/crossplane/crossplane/apis/pkg/meta/v1" v1 "github.com/crossplane/crossplane/apis/pkg/v1" "github.com/crossplane/crossplane/apis/pkg/v1beta1" "github.com/crossplane/crossplane/internal/dag" "github.com/crossplane/crossplane/internal/xpkg" ) const ( lockName = "lock" errNotMeta = "meta type is not a valid package" errGetOrCreateLock = "cannot get or create lock" errInitDAG = "cannot initialize dependency graph from the packages in the lock" errFmtIncompatibleDependency = "incompatible dependencies: %s" errFmtMissingDependencies = "missing dependencies: %+v" errDependencyNotInGraph = "dependency is not present in graph" errDependencyNotLockPackage = "dependency in graph is not a lock package" ) // DependencyManager is a lock on packages. type DependencyManager interface { Resolve(ctx context.Context, meta pkgmetav1.Pkg, pr v1.PackageRevision) (found, installed, invalid int, err error) RemoveSelf(ctx context.Context, pr v1.PackageRevision) error } // PackageDependencyManager is a resolver for packages. type PackageDependencyManager struct { client client.Client newDag dag.NewDAGFn packageType schema.GroupVersionKind log logging.Logger } // NewPackageDependencyManager creates a new PackageDependencyManager. func NewPackageDependencyManager(c client.Client, nd dag.NewDAGFn, pkgType schema.GroupVersionKind, l logging.Logger) *PackageDependencyManager { return &PackageDependencyManager{ client: c, newDag: nd, packageType: pkgType, log: l, } } // Resolve resolves package dependencies. func (m *PackageDependencyManager) Resolve(ctx context.Context, meta pkgmetav1.Pkg, pr v1.PackageRevision) (found, installed, invalid int, err error) { //nolint:gocognit // TODO(negz): Can this be refactored for less complexity? // If we are inactive, we don't need to resolve dependencies. if pr.GetDesiredState() == v1.PackageRevisionInactive { return 0, 0, 0, nil } // Copy package dependencies into Lock Dependencies. sources := make([]v1beta1.Dependency, len(meta.GetDependencies())) for i, dep := range meta.GetDependencies() { pdep := v1beta1.Dependency{} switch { // If the GVK and package are specified explicitly they take precedence. case dep.APIVersion != nil && dep.Kind != nil && dep.Package != nil: pdep.APIVersion = dep.APIVersion pdep.Kind = dep.Kind pdep.Package = *dep.Package case dep.Configuration != nil: pdep.Package = *dep.Configuration pdep.Type = ptr.To(v1beta1.ConfigurationPackageType) case dep.Provider != nil: pdep.Package = *dep.Provider pdep.Type = ptr.To(v1beta1.ProviderPackageType) case dep.Function != nil: pdep.Package = *dep.Function pdep.Type = ptr.To(v1beta1.FunctionPackageType) default: return 0, 0, 0, errors.Errorf("encountered an invalid dependency: package dependencies must specify either a valid type, or an explicit apiVersion, kind, and package") } pdep.Constraints = dep.Version sources[i] = pdep } found = len(sources) // Get the lock. lock := &v1beta1.Lock{} err = m.client.Get(ctx, types.NamespacedName{Name: lockName}, lock) if kerrors.IsNotFound(err) { lock.Name = lockName err = m.client.Create(ctx, lock, &client.CreateOptions{}) } if err != nil { return found, installed, invalid, errors.Wrap(err, errGetOrCreateLock) } prRef, err := name.ParseReference(pr.GetSource(), name.WithDefaultRegistry("")) if err != nil { return found, installed, invalid, err } d := m.newDag() implied, err := d.Init(v1beta1.ToNodes(lock.Packages...)) if err != nil { return found, installed, invalid, errors.Wrap(err, errInitDAG) } lockRef := xpkg.ParsePackageSourceFromReference(prRef) // NOTE(hasheddan): consider adding health of package to lock so that it can // be rolled up to any dependent packages. self := v1beta1.LockPackage{ APIVersion: ptr.To(m.packageType.GroupVersion().String()), Kind: ptr.To(m.packageType.Kind), Name: pr.GetName(), Source: lockRef, Version: prRef.Identifier(), Dependencies: sources, } // Delete packages in lock with same name and distinct source // This is a corner case when source is updated but image SHA is not (i.e. relocate same image // to another registry) for _, lp := range lock.Packages { if self.Name == lp.Name && self.Type == lp.Type && self.Source != lp.Identifier() { m.log.Debug("Package with same name and type but different source exists in lock. Removing it.", "name", lp.Name, "type", ptr.Deref(lp.Type, "Unknown"), "old-source", lp.Identifier(), "new-source", self.Source, ) if err := m.RemoveSelf(ctx, pr); err != nil { return found, installed, invalid, err } // refresh the lock to be in sync with the contents if err = m.client.Get(ctx, types.NamespacedName{Name: lockName}, lock); err != nil { return found, installed, invalid, err } break } } prExists := false for _, lp := range lock.Packages { if lp.Name == pr.GetName() { prExists = true break } } // If we don't exist in lock then we should add self. if !prExists { lock.Packages = append(lock.Packages, self) if err := m.client.Update(ctx, lock); err != nil { return found, installed, invalid, err } // Package may exist in the graph as a dependency, or may not exist at // all. We need to either convert it to a full node or add it. d.AddOrUpdateNodes(&self) // If any direct dependencies are missing we skip checking for // transitive ones. var missing []dag.Node for _, dep := range self.Dependencies { if d.NodeExists(dep.Identifier()) { installed++ continue } missing = append(missing, &dep) } if installed != found { return found, installed, invalid, errors.Errorf(errFmtMissingDependencies, NDependenciesAndSomeMore(3, missing)) } } tree, err := d.TraceNode(lockRef) if err != nil { return found, installed, invalid, err } found = len(tree) installed = found // Check if any dependencies or transitive dependencies are missing (implied). var missing []dag.Node for _, imp := range implied { if _, ok := tree[imp.Identifier()]; ok { installed-- missing = append(missing, imp) } } if len(missing) != 0 { return found, installed, invalid, errors.Errorf(errFmtMissingDependencies, NDependenciesAndSomeMore(3, missing)) } // All of our dependencies and transitive dependencies must exist. Check // that neighbors have valid versions. var invalidDeps []string for _, dep := range self.Dependencies { n, err := d.GetNode(dep.Package) if err != nil { return found, installed, invalid, errors.New(errDependencyNotInGraph) } lp, ok := n.(*v1beta1.LockPackage) if !ok { return found, installed, invalid, errors.New(errDependencyNotLockPackage) } // Check if the constraint is a digest, if so, compare it directly. if d, err := conregv1.NewHash(dep.Constraints); err == nil { if lp.Version != d.String() { return found, installed, invalid, errors.Errorf("existing package %s@%s is incompatible with constraint %s", lp.Identifier(), lp.Version, strings.TrimSpace(dep.Constraints)) } continue } c, err := semver.NewConstraint(dep.Constraints) if err != nil { return found, installed, invalid, err } v, err := semver.NewVersion(lp.Version) if err != nil { return found, installed, invalid, err } if !c.Check(v) { s := fmt.Sprintf("existing package %s@%s", lp.Identifier(), lp.Version) if dep.Constraints != "" { s = fmt.Sprintf("%s is incompatible with constraint %s", s, strings.TrimSpace(dep.Constraints)) } invalidDeps = append(invalidDeps, s) } } invalid = len(invalidDeps) if invalid > 0 { return found, installed, invalid, errors.Errorf(errFmtIncompatibleDependency, strings.Join(invalidDeps, "; ")) } return found, installed, invalid, nil } // RemoveSelf removes a package from the lock. func (m *PackageDependencyManager) RemoveSelf(ctx context.Context, pr v1.PackageRevision) error { // Get the lock. lock := &v1beta1.Lock{} err := m.client.Get(ctx, types.NamespacedName{Name: lockName}, lock) if kerrors.IsNotFound(err) { // If lock does not exist then we don't need to remove self. return nil } if err != nil { return err } // Find self and remove. If we don't exist, its a no-op. for i, lp := range lock.Packages { if lp.Name == pr.GetName() { m.log.Debug("Removing package revision from lock", "name", lp.Name) lock.Packages = append(lock.Packages[:i], lock.Packages[i+1:]...) return m.client.Update(ctx, lock) } } return nil } // NDependenciesAndSomeMore returns the first n dependencies in detail, and a // summary of how many more exist. func NDependenciesAndSomeMore(n int, d []dag.Node) string { out := make([]string, len(d)) for i := range d { if d[i].GetConstraints() == "" { out[i] = fmt.Sprintf("%q", d[i].Identifier()) continue } out[i] = fmt.Sprintf("%q (%s)", d[i].Identifier(), d[i].GetConstraints()) } return resource.StableNAndSomeMore(n, out) }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package revision import ( "context" "fmt" "strings" "golang.org/x/sync/errgroup" admv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/resource" v1 "github.com/crossplane/crossplane/apis/pkg/v1" ) const ( errAssertResourceObj = "cannot assert object to resource.Object" errAssertClientObj = "cannot assert object to client.Object" errConversionWithNoWebhookCA = "cannot deploy a CRD with webhook conversion strategy without having a TLS bundle" errGetWebhookTLSSecret = "cannot get webhook tls secret" errWebhookSecretWithoutCABundle = "the value for the key tls.crt cannot be empty" errFmtGetOwnedObject = "cannot get owned object: %s/%s" errFmtUpdateOwnedObject = "cannot update owned object: %s/%s" ) const ( // ServicePort is the port number used by package services for webhook communication. ServicePort = 9443 ) // An Establisher establishes control or ownership of a set of resources in the // API server by checking that control or ownership can be established for all // resources and then establishing it. type Establisher interface { Establish(ctx context.Context, objects []runtime.Object, parent v1.PackageRevision, control bool) ([]xpv1.TypedReference, error) ReleaseObjects(ctx context.Context, parent v1.PackageRevision) error } // NewNopEstablisher returns a new NopEstablisher. func NewNopEstablisher() *NopEstablisher { return &NopEstablisher{} } // NopEstablisher does nothing. type NopEstablisher struct{} // Establish does nothing. func (*NopEstablisher) Establish(_ context.Context, _ []runtime.Object, _ v1.PackageRevision, _ bool) ([]xpv1.TypedReference, error) { return nil, nil } // ReleaseObjects does nothing. func (*NopEstablisher) ReleaseObjects(_ context.Context, _ v1.PackageRevision) error { return nil } // APIEstablisher establishes control or ownership of resources in the API // server for a parent. type APIEstablisher struct { client client.Client namespace string MaxConcurrentPackageEstablishers int } // NewAPIEstablisher creates a new APIEstablisher. func NewAPIEstablisher(client client.Client, namespace string, maxConcurrentPackageEstablishers int) *APIEstablisher { return &APIEstablisher{ client: client, namespace: namespace, MaxConcurrentPackageEstablishers: maxConcurrentPackageEstablishers, } } // currentDesired caches resources while checking for control or ownership so // that they do not have to be fetched from the API server again when control or // ownership is established. type currentDesired struct { Current resource.Object Desired resource.Object Exists bool } // Establish checks that control or ownership of resources can be established by // parent, then establishes it. func (e *APIEstablisher) Establish(ctx context.Context, objs []runtime.Object, parent v1.PackageRevision, control bool) ([]xpv1.TypedReference, error) { err := e.addLabels(objs, parent) if err != nil { return nil, err } allObjs, err := e.validate(ctx, objs, parent, control) if err != nil { return nil, err } resourceRefs, err := e.establish(ctx, allObjs, parent, control) if err != nil { return nil, err } return resourceRefs, nil } // ReleaseObjects removes control of owned resources in the API server for a // package revision. func (e *APIEstablisher) ReleaseObjects(ctx context.Context, parent v1.PackageRevision) error { //nolint:gocognit // complexity coming from parallelism. // Note(turkenh): We rely on status.objectRefs to get the list of objects // that are controlled by the package revision. Relying on the status is // not ideal as it might get lost (e.g. if the status subresource is // not properly restored after a backup/restore operation). However, we will // handle this by conditionally fetching/parsing package if there is no // referenced resources available and rebuilding the status. // In the next reconciliation loop, and we will be able to remove the // control/ownership of the objects using the new status. allObjs := parent.GetObjects() if len(allObjs) == 0 { return nil } g, ctx := errgroup.WithContext(ctx) g.SetLimit(e.MaxConcurrentPackageEstablishers) for _, ref := range allObjs { g.Go(func() error { select { case <-ctx.Done(): return ctx.Err() default: } u := unstructured.Unstructured{} u.SetAPIVersion(ref.APIVersion) u.SetKind(ref.Kind) u.SetName(ref.Name) if err := e.client.Get(ctx, types.NamespacedName{Name: u.GetName()}, &u); err != nil { if kerrors.IsNotFound(err) { // This is not expected, but still not an error for releasing objects. return nil } return errors.Wrapf(err, errFmtGetOwnedObject, u.GetKind(), u.GetName()) } ors := u.GetOwnerReferences() found := false changed := false for i := range ors { if ors[i].UID == parent.GetUID() { found = true if ors[i].Controller != nil && *ors[i].Controller { ors[i].Controller = ptr.To(false) changed = true } break } // Note(turkenh): What if we cannot find our UID in the owner // references? This is not expected unless another party stripped // out ownerRefs. I believe this is a fairly unlikely scenario, // and we can ignore it for now especially considering that if that // happens active revision or the package itself will still take // over the ownership of such resources. } if !found { // Make sure the package revision exists as an owner. ors = append(ors, meta.AsOwner(meta.TypedReferenceTo(parent, parent.GetObjectKind().GroupVersionKind()))) changed = true } if changed { u.SetOwnerReferences(ors) if err := e.client.Update(ctx, &u); err != nil { return errors.Wrapf(err, errFmtUpdateOwnedObject, u.GetKind(), u.GetName()) } } return nil }) } return g.Wait() } func (e *APIEstablisher) addLabels(objs []runtime.Object, parent v1.PackageRevision) error { commonLabels := parent.GetCommonLabels() for _, obj := range objs { // convert to resource.Object to be able to access metadata d, ok := obj.(resource.Object) if !ok { return errors.New(errConfResourceObject) } labels := d.GetLabels() if labels != nil { for key, value := range commonLabels { labels[key] = value } } else { d.SetLabels(commonLabels) } } return nil } func (e *APIEstablisher) validate(ctx context.Context, objs []runtime.Object, parent v1.PackageRevision, control bool) (allObjs []currentDesired, err error) { //nolint:gocognit // TODO(negz): Refactor this to break up complexity. var webhookTLSCert []byte if parentWithRuntime, ok := parent.(v1.PackageRevisionWithRuntime); ok && control { webhookTLSCert, err = e.getWebhookTLSCert(ctx, parentWithRuntime) if err != nil { return nil, err } } g, ctx := errgroup.WithContext(ctx) g.SetLimit(e.MaxConcurrentPackageEstablishers) out := make(chan currentDesired, len(objs)) for _, res := range objs { g.Go(func() error { // Assert desired object to resource.Object so that we can access its // metadata. desired, ok := res.(resource.Object) if !ok { return errors.New(errAssertResourceObj) } if control { if err := e.enrichControlledResource(res, webhookTLSCert, parent); err != nil { return err } } // Make a copy of the desired object to be populated with existing // object, if it exists. resCopy := res.DeepCopyObject() current, ok := resCopy.(client.Object) if !ok { return errors.New(errAssertClientObj) } err := e.client.Get(ctx, types.NamespacedName{Name: desired.GetName(), Namespace: desired.GetNamespace()}, current) if resource.IgnoreNotFound(err) != nil { return err } // If resource does not already exist, we must attempt to dry run create // it. if kerrors.IsNotFound(err) { // We will not create a resource if we are not going to control it, // so we don't need to check with dry run. if control { if err := e.create(ctx, desired, parent, client.DryRunAll); err != nil { return err } } // Add to objects as not existing. select { case out <- currentDesired{Desired: desired, Current: nil, Exists: false}: return nil case <-ctx.Done(): return ctx.Err() } } if err := e.update(ctx, current, desired, parent, control, client.DryRunAll); err != nil { return err } // Add to objects as existing. select { case out <- currentDesired{Desired: desired, Current: current, Exists: true}: return nil case <-ctx.Done(): return ctx.Err() } }) } if err := g.Wait(); err != nil { return nil, err } close(out) for obj := range out { allObjs = append(allObjs, obj) } return allObjs, nil } func (e *APIEstablisher) enrichControlledResource(res runtime.Object, webhookTLSCert []byte, parent v1.PackageRevision) error { //nolint:gocognit // just a switch // The generated webhook configurations have a static hard-coded name // that the developers of the providers can't affect. Here, we make sure // to distinguish one from the other by setting the name to the parent // since there is always a single ValidatingWebhookConfiguration and/or // single MutatingWebhookConfiguration object in a provider package. // See https://github.com/kubernetes-sigs/controller-tools/issues/658 switch conf := res.(type) { case *admv1.ValidatingWebhookConfiguration: if len(webhookTLSCert) == 0 { return nil } if pkgRef, ok := GetPackageOwnerReference(parent); ok { conf.SetName(fmt.Sprintf("crossplane-%s-%s", strings.ToLower(pkgRef.Kind), pkgRef.Name)) } for i := range conf.Webhooks { conf.Webhooks[i].ClientConfig.CABundle = webhookTLSCert if conf.Webhooks[i].ClientConfig.Service == nil { conf.Webhooks[i].ClientConfig.Service = &admv1.ServiceReference{} } conf.Webhooks[i].ClientConfig.Service.Name = parent.GetLabels()[v1.LabelParentPackage] conf.Webhooks[i].ClientConfig.Service.Namespace = e.namespace conf.Webhooks[i].ClientConfig.Service.Port = ptr.To[int32](ServicePort) } case *admv1.MutatingWebhookConfiguration: if len(webhookTLSCert) == 0 { return nil } if pkgRef, ok := GetPackageOwnerReference(parent); ok { conf.SetName(fmt.Sprintf("crossplane-%s-%s", strings.ToLower(pkgRef.Kind), pkgRef.Name)) } for i := range conf.Webhooks { conf.Webhooks[i].ClientConfig.CABundle = webhookTLSCert if conf.Webhooks[i].ClientConfig.Service == nil { conf.Webhooks[i].ClientConfig.Service = &admv1.ServiceReference{} } conf.Webhooks[i].ClientConfig.Service.Name = parent.GetLabels()[v1.LabelParentPackage] conf.Webhooks[i].ClientConfig.Service.Namespace = e.namespace conf.Webhooks[i].ClientConfig.Service.Port = ptr.To[int32](ServicePort) } case *extv1.CustomResourceDefinition: if conf.Spec.Conversion != nil && conf.Spec.Conversion.Strategy == extv1.WebhookConverter { if len(webhookTLSCert) == 0 { return errors.New(errConversionWithNoWebhookCA) } if conf.Spec.Conversion.Webhook == nil { conf.Spec.Conversion.Webhook = &extv1.WebhookConversion{} } if conf.Spec.Conversion.Webhook.ClientConfig == nil { conf.Spec.Conversion.Webhook.ClientConfig = &extv1.WebhookClientConfig{} } if conf.Spec.Conversion.Webhook.ClientConfig.Service == nil { conf.Spec.Conversion.Webhook.ClientConfig.Service = &extv1.ServiceReference{} } conf.Spec.Conversion.Webhook.ClientConfig.CABundle = webhookTLSCert conf.Spec.Conversion.Webhook.ClientConfig.Service.Name = parent.GetLabels()[v1.LabelParentPackage] conf.Spec.Conversion.Webhook.ClientConfig.Service.Namespace = e.namespace conf.Spec.Conversion.Webhook.ClientConfig.Service.Port = ptr.To[int32](ServicePort) } } return nil } // getWebhookTLSCert returns the TLS certificate of the webhook server if the // revision has a TLS server secret name. func (e *APIEstablisher) getWebhookTLSCert(ctx context.Context, parentWithRuntime v1.PackageRevisionWithRuntime) (webhookTLSCert []byte, err error) { tlsServerSecretName := parentWithRuntime.GetTLSServerSecretName() if tlsServerSecretName == nil { return nil, nil } s := &corev1.Secret{} nn := types.NamespacedName{Name: *tlsServerSecretName, Namespace: e.namespace} err = e.client.Get(ctx, nn, s) if err != nil { return nil, errors.Wrap(err, errGetWebhookTLSSecret) } if len(s.Data["tls.crt"]) == 0 { return nil, errors.New(errWebhookSecretWithoutCABundle) } webhookTLSCert = s.Data["tls.crt"] return webhookTLSCert, nil } func (e *APIEstablisher) establish(ctx context.Context, allObjs []currentDesired, parent client.Object, control bool) ([]xpv1.TypedReference, error) { g, ctx := errgroup.WithContext(ctx) g.SetLimit(e.MaxConcurrentPackageEstablishers) out := make(chan xpv1.TypedReference, len(allObjs)) for _, cd := range allObjs { g.Go(func() error { if !cd.Exists { // Only create a missing resource if we are going to control it. // This prevents an inactive revision from racing to create a // resource before an active revision of the same parent. if control { if err := e.create(ctx, cd.Desired, parent); err != nil { return err } } select { case out <- *meta.TypedReferenceTo(cd.Desired, cd.Desired.GetObjectKind().GroupVersionKind()): return nil case <-ctx.Done(): return ctx.Err() } } if err := e.update(ctx, cd.Current, cd.Desired, parent, control); err != nil { return err } select { case out <- *meta.TypedReferenceTo(cd.Desired, cd.Desired.GetObjectKind().GroupVersionKind()): return nil case <-ctx.Done(): return ctx.Err() } }) } if err := g.Wait(); err != nil { return nil, err } close(out) resourceRefs := []xpv1.TypedReference{} for ref := range out { resourceRefs = append(resourceRefs, ref) } return resourceRefs, nil } func (e *APIEstablisher) create(ctx context.Context, obj resource.Object, parent resource.Object, opts ...client.CreateOption) error { refs := []metav1.OwnerReference{ meta.AsController(meta.TypedReferenceTo(parent, parent.GetObjectKind().GroupVersionKind())), } // We add the parent as `owner` of the resources so that the resource doesn't // get deleted when the new revision doesn't include it in order not to lose // user data, such as custom resources of an old CRD. if pkgRef, ok := GetPackageOwnerReference(parent); ok { pkgRef.Controller = ptr.To(false) refs = append(refs, pkgRef) } // Overwrite any owner references on the desired object. obj.SetOwnerReferences(refs) return e.client.Create(ctx, obj, opts...) } func (e *APIEstablisher) update(ctx context.Context, current, desired resource.Object, parent resource.Object, control bool, opts ...client.UpdateOption) error { // We add the parent as `owner` of the resources so that the resource doesn't // get deleted when the new revision doesn't include it in order not to lose // user data, such as custom resources of an old CRD. if pkgRef, ok := GetPackageOwnerReference(parent); ok { pkgRef.Controller = ptr.To(false) meta.AddOwnerReference(current, pkgRef) } if !control { meta.AddOwnerReference(current, meta.AsOwner(meta.TypedReferenceTo(parent, parent.GetObjectKind().GroupVersionKind()))) return e.client.Update(ctx, current, opts...) } // If desire is to control object, we attempt to update the object by // setting the desired owner references equal to that of the current, adding // a controller reference to the parent, and setting the desired resource // version to that of the current. desired.SetOwnerReferences(current.GetOwnerReferences()) if err := meta.AddControllerReference(desired, meta.AsController(meta.TypedReferenceTo(parent, parent.GetObjectKind().GroupVersionKind()))); err != nil { return err } desired.SetResourceVersion(current.GetResourceVersion()) return e.client.Update(ctx, desired, opts...) } // GetPackageOwnerReference returns the owner reference that points to the owner // package of given revision, if it can find one. func GetPackageOwnerReference(rev resource.Object) (metav1.OwnerReference, bool) { name := rev.GetLabels()[v1.LabelParentPackage] for _, owner := range rev.GetOwnerReferences() { if owner.Name == name { return owner, true } } return metav1.OwnerReference{}, false }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package revision import ( "bytes" "context" "errors" "io" "testing" fuzz "github.com/AdaLogics/go-fuzz-headers" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/crossplane/crossplane-runtime/pkg/parser" "github.com/crossplane/crossplane-runtime/pkg/test" pkgmetav1 "github.com/crossplane/crossplane/apis/pkg/meta/v1" v1 "github.com/crossplane/crossplane/apis/pkg/v1" "github.com/crossplane/crossplane/apis/pkg/v1beta1" "github.com/crossplane/crossplane/internal/dag" dagfake "github.com/crossplane/crossplane/internal/dag/fake" "github.com/crossplane/crossplane/internal/xpkg" ) var ( metaScheme *runtime.Scheme objScheme *runtime.Scheme linter = xpkg.NewProviderLinter() ) func init() { var err error metaScheme, err = xpkg.BuildMetaScheme() if err != nil { panic(err) } objScheme, err = xpkg.BuildObjectScheme() if err != nil { panic(err) } } func newFuzzDag(ff *fuzz.ConsumeFuzzer) (func() dag.DAG, error) { traceNodeMap := make(map[string]dag.Node) err := ff.FuzzMap(&traceNodeMap) if err != nil { return func() dag.DAG { return nil }, err } lp := &v1beta1.LockPackage{} err = ff.GenerateStruct(lp) if err != nil { return func() dag.DAG { return nil }, err } return func() dag.DAG { return &dagfake.MockDag{ MockInit: func(_ []dag.Node) ([]dag.Node, error) { return nil, nil }, MockNodeExists: func(_ string) bool { return true }, MockTraceNode: func(_ string) (map[string]dag.Node, error) { return traceNodeMap, nil }, MockGetNode: func(_ string) (dag.Node, error) { return lp, nil }, } }, nil } func getFuzzMockClient(ff *fuzz.ConsumeFuzzer) (*test.MockClient, error) { lockPackages := make([]v1beta1.LockPackage, 0) ff.CreateSlice(&lockPackages) if len(lockPackages) == 0 { return nil, errors.New("No packages created") } return &test.MockClient{ MockGet: test.NewMockGetFn(nil, func(obj client.Object) error { l := obj.(*v1beta1.Lock) l.Packages = lockPackages return nil }), MockUpdate: test.NewMockUpdateFn(nil), }, nil } func FuzzRevisionControllerPackageHandling(f *testing.F) { f.Fuzz(func(_ *testing.T, data, revisionData []byte) { ff := fuzz.NewConsumer(revisionData) p := parser.New(metaScheme, objScheme) r := io.NopCloser(bytes.NewReader(data)) pkg, err := p.Parse(context.Background(), r) if err != nil { return } if len(pkg.GetMeta()) == 0 { return } if len(pkg.GetObjects()) == 0 { return } prs := &v1.PackageRevisionSpec{} ff.GenerateStruct(prs) pr := &v1.ConfigurationRevision{Spec: *prs} if err := linter.Lint(pkg); err != nil { return } pkgMeta, _ := xpkg.TryConvertToPkg(pkg.GetMeta()[0], &pkgmetav1.Provider{}, &pkgmetav1.Configuration{}) c, err := getFuzzMockClient(ff) if err != nil { return } fd, err := newFuzzDag(ff) if err != nil { return } pm := &PackageDependencyManager{ client: c, newDag: fd, } _, _, _, _ = pm.Resolve(context.Background(), pkgMeta, pr) }) }
/* Copyright 2020 The Crossiane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in comiiance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by apiicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or imiied. See the License for the specific language governing permissions and limitations under the License. */ package revision import ( "archive/tar" "context" "io" "path/filepath" "github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/v1/mutate" "github.com/google/go-containerregistry/pkg/v1/validate" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/parser" v1 "github.com/crossplane/crossplane/apis/pkg/v1" "github.com/crossplane/crossplane/internal/xpkg" ) const ( errBadReference = "package tag is not a valid reference" errFetchPackage = "failed to fetch package from remote" errGetManifest = "failed to get package image manifest from remote" errFetchLayer = "failed to fetch annotated base layer from remote" errGetUncompressed = "failed to get uncompressed contents from layer" errMultipleAnnotatedLayers = "package is invalid due to multiple annotated base layers" errFmtNoPackageFileFound = "couldn't find \"" + xpkg.StreamFile + "\" file after checking %d files in the archive (annotated layer: %v)" errFmtMaxManifestLayers = "package has %d layers, but only %d are allowed" errValidateLayer = "invalid package layer" errValidateImage = "invalid package image" ) const ( layerAnnotation = "io.crossplane.xpkg" baseAnnotationValue = "base" // maxLayers is the maximum number of layers an image can have. maxLayers = 256 ) // ImageBackend is a backend for parser. type ImageBackend struct { registry string fetcher xpkg.Fetcher } // An ImageBackendOption sets configuration for an image backend. type ImageBackendOption func(i *ImageBackend) // WithDefaultRegistry sets the default registry that an image backend will use. func WithDefaultRegistry(registry string) ImageBackendOption { return func(i *ImageBackend) { i.registry = registry } } // NewImageBackend creates a new image backend. func NewImageBackend(fetcher xpkg.Fetcher, opts ...ImageBackendOption) *ImageBackend { i := &ImageBackend{ fetcher: fetcher, } for _, opt := range opts { opt(i) } return i } // Init initializes an ImageBackend. func (i *ImageBackend) Init(ctx context.Context, bo ...parser.BackendOption) (io.ReadCloser, error) { // NOTE(hasheddan): we use nestedBackend here because simultaneous // reconciles of providers or configurations can lead to the package // revision being overwritten mid-execution in the shared image backend when // it is a member of the image backend struct. We could introduce a lock // here, but there is no reason why a given reconcile should require // exclusive access to the image backend other than its poor design. We // should consider restructuring the parser backend interface to better // accommodate for shared, thread-safe backends. n := &nestedBackend{} for _, o := range bo { o(n) } // Use the package recorded in the status rather than the one from the spec, // since it may have been rewritten by an image config. ref, err := name.ParseReference(n.pr.GetResolvedSource(), name.WithDefaultRegistry(i.registry)) if err != nil { return nil, errors.Wrap(err, errBadReference) } // Fetch image from registry. ps := v1.RefNames(n.pr.GetPackagePullSecrets()) if n.pullSecretFromConfig != "" { ps = append(ps, n.pullSecretFromConfig) } img, err := i.fetcher.Fetch(ctx, ref, ps...) if err != nil { return nil, errors.Wrap(err, errFetchPackage) } // Get image manifest. manifest, err := img.Manifest() if err != nil { return nil, errors.Wrap(err, errGetManifest) } // Check that the image has less than the maximum allowed number of layers. if nLayers := len(manifest.Layers); nLayers > maxLayers { return nil, errors.Errorf(errFmtMaxManifestLayers, nLayers, maxLayers) } // Determine if the image is using annotated layers. var tarc io.ReadCloser foundAnnotated := false for _, l := range manifest.Layers { if a, ok := l.Annotations[layerAnnotation]; !ok || a != baseAnnotationValue { continue } // NOTE(hasheddan): the xpkg specification dictates that only one layer // descriptor may be annotated as xpkg base. Since iterating through all // descriptors is relatively inexpensive, we opt to do so in order to // verify that we aren't just using the first layer annotated as xpkg // base. if foundAnnotated { return nil, errors.New(errMultipleAnnotatedLayers) } foundAnnotated = true layer, err := img.LayerByDigest(l.Digest) if err != nil { return nil, errors.Wrap(err, errFetchLayer) } if err := validate.Layer(layer); err != nil { return nil, errors.Wrap(err, errValidateLayer) } tarc, err = layer.Uncompressed() if err != nil { return nil, errors.Wrap(err, errGetUncompressed) } } // If we still don't have content then we need to flatten image filesystem. if !foundAnnotated { if err := validate.Image(img); err != nil { return nil, errors.Wrap(err, errValidateImage) } tarc = mutate.Extract(img) } // The ReadCloser is an uncompressed tarball, either consisting of annotated // layer contents or flattened filesystem content. Either way, we only want // the package YAML stream. t := tar.NewReader(tarc) var read int for { h, err := t.Next() if err != nil { return nil, errors.Wrapf(err, errFmtNoPackageFileFound, read, foundAnnotated) } if filepath.Base(h.Name) == xpkg.StreamFile { break } read++ } // NOTE(hasheddan): we return a JoinedReadCloser such that closing will free // resources allocated to the underlying ReadCloser. See // https://github.com/google/go-containerregistry/blob/329563766ce8131011c25fd8758a25d94d9ad81b/pkg/v1/mutate/mutate.go#L222 // for more info. return xpkg.JoinedReadCloser(t, tarc), nil } // nestedBackend is a nop parser backend that conforms to the parser backend // interface to allow holding intermediate data passed via parser backend // options. // NOTE(hasheddan): see usage in ImageBackend Init() for reasoning. type nestedBackend struct { pr v1.PackageRevision pullSecretFromConfig string } // Init is a nop because nestedBackend does not actually meant to act as a // parser backend. func (n *nestedBackend) Init(_ context.Context, _ ...parser.BackendOption) (io.ReadCloser, error) { return nil, nil } // PackageRevision sets the package revision for ImageBackend. func PackageRevision(pr v1.PackageRevision) parser.BackendOption { return func(p parser.Backend) { i, ok := p.(*nestedBackend) if !ok { return } i.pr = pr } } // PullSecretFromConfig sets the image config pull secret for ImageBackend. func PullSecretFromConfig(secret string) parser.BackendOption { return func(p parser.Backend) { i, ok := p.(*nestedBackend) if !ok { return } i.pullSecretFromConfig = secret } }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package revision implements the Crossplane Package Revision controllers. package revision import ( "context" "fmt" "io" "sort" "strings" "time" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/conditions" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/feature" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/parser" "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane/crossplane-runtime/pkg/resource" pkgmetav1 "github.com/crossplane/crossplane/apis/pkg/meta/v1" v1 "github.com/crossplane/crossplane/apis/pkg/v1" "github.com/crossplane/crossplane/apis/pkg/v1beta1" "github.com/crossplane/crossplane/internal/controller/pkg/controller" "github.com/crossplane/crossplane/internal/dag" "github.com/crossplane/crossplane/internal/features" "github.com/crossplane/crossplane/internal/version" "github.com/crossplane/crossplane/internal/xpkg" ) const ( reconcileTimeout = 3 * time.Minute // the max size of a package parsed by the parser. maxPackageSize = 200 << 20 // 100 MB ) const ( finalizer = "revision.pkg.crossplane.io" errGetPackageRevision = "cannot get package revision" errUpdateStatus = "cannot update package revision status" errDeleteCache = "cannot remove package contents from cache" errGetCache = "cannot get package contents from cache" errPullPolicyNever = "failed to get pre-cached package with pull policy Never" errAddFinalizer = "cannot add package revision finalizer" errRemoveFinalizer = "cannot remove package revision finalizer" errGetPullConfig = "cannot get image pull secret from config" errRewriteImage = "cannot rewrite image path using config" errDeactivateRevision = "cannot deactivate package revision" errInitParserBackend = "cannot initialize parser backend" errParsePackage = "cannot parse package contents" errLintPackage = "linting package contents failed" errNotOneMeta = "cannot install package with multiple meta types" errIncompatible = "incompatible Crossplane version" errEstablishControl = "cannot establish control of object" errReleaseObjects = "cannot release objects" errUpdateMeta = "cannot update package revision object metadata" errRemoveLock = "cannot remove package revision from Lock" errResolveDeps = "cannot resolve package dependencies" errConfResourceObject = "cannot convert to resource.Object" errCannotInitializeHostClientSet = "failed to initialize host clientset with in cluster config" errCannotBuildMetaSchema = "cannot build meta scheme for package parser" errCannotBuildObjectSchema = "cannot build object scheme for package parser" errCannotBuildFetcher = "cannot build fetcher for package parser" reconcilePausedMsg = "Reconciliation (including deletion) is paused via the pause annotation" ) // Event reasons. const ( reasonImageConfig event.Reason = "ImageConfigSelection" reasonParse event.Reason = "ParsePackage" reasonLint event.Reason = "LintPackage" reasonDependencies event.Reason = "ResolveDependencies" reasonSync event.Reason = "SyncPackage" reasonDeactivate event.Reason = "DeactivateRevision" reasonPaused event.Reason = "ReconciliationPaused" ) // ReconcilerOption is used to configure the Reconciler. type ReconcilerOption func(*Reconciler) // WithClientApplicator specifies how the Reconciler should interact with the // Kubernetes API. func WithClientApplicator(ca resource.ClientApplicator) ReconcilerOption { return func(r *Reconciler) { r.client = ca } } // WithCache specifies how the Reconcile should cache package contents. func WithCache(c xpkg.PackageCache) ReconcilerOption { return func(r *Reconciler) { r.cache = c } } // WithNewPackageRevisionFn determines the type of package being reconciled. func WithNewPackageRevisionFn(f func() v1.PackageRevision) ReconcilerOption { return func(r *Reconciler) { r.newPackageRevision = f } } // WithLogger specifies how the Reconciler should log messages. func WithLogger(log logging.Logger) ReconcilerOption { return func(r *Reconciler) { r.log = log } } // WithRecorder specifies how the Reconciler should record Kubernetes events. func WithRecorder(er event.Recorder) ReconcilerOption { return func(r *Reconciler) { r.record = er } } // WithFinalizer specifies how the Reconciler should finalize package revisions. func WithFinalizer(f resource.Finalizer) ReconcilerOption { return func(r *Reconciler) { r.revision = f } } // WithDependencyManager specifies how the Reconciler should manage dependencies. func WithDependencyManager(m DependencyManager) ReconcilerOption { return func(r *Reconciler) { r.lock = m } } // WithEstablisher specifies how the Reconciler should establish package resources. func WithEstablisher(e Establisher) ReconcilerOption { return func(r *Reconciler) { r.objects = e } } // WithParser specifies how the Reconciler should parse a package. func WithParser(p parser.Parser) ReconcilerOption { return func(r *Reconciler) { r.parser = p } } // WithParserBackend specifies how the Reconciler should parse a package. func WithParserBackend(p parser.Backend) ReconcilerOption { return func(r *Reconciler) { r.backend = p } } // WithConfigStore specifies the ConfigStore to use for fetching image // configurations. func WithConfigStore(c xpkg.ConfigStore) ReconcilerOption { return func(r *Reconciler) { r.config = c } } // WithLinter specifies how the Reconciler should lint a package. func WithLinter(l parser.Linter) ReconcilerOption { return func(r *Reconciler) { r.linter = l } } // WithVersioner specifies how the Reconciler should fetch the current // Crossplane version. func WithVersioner(v version.Operations) ReconcilerOption { return func(r *Reconciler) { r.versioner = v } } // WithNamespace specifies the namespace in which the Reconciler should create // runtime resources. func WithNamespace(n string) ReconcilerOption { return func(r *Reconciler) { r.namespace = n } } // WithServiceAccount specifies the core Crossplane ServiceAccount name. func WithServiceAccount(sa string) ReconcilerOption { return func(r *Reconciler) { r.serviceAccount = sa } } // WithFeatureFlags specifies the feature flags to inject into the Reconciler. func WithFeatureFlags(f *feature.Flags) ReconcilerOption { return func(r *Reconciler) { r.features = f } } // Reconciler reconciles packages. type Reconciler struct { client client.Client cache xpkg.PackageCache revision resource.Finalizer lock DependencyManager objects Establisher parser parser.Parser linter parser.Linter versioner version.Operations backend parser.Backend config xpkg.ConfigStore log logging.Logger record event.Recorder conditions conditions.Manager features *feature.Flags namespace string serviceAccount string newPackageRevision func() v1.PackageRevision } // SetupProviderRevision adds a controller that reconciles ProviderRevisions. func SetupProviderRevision(mgr ctrl.Manager, o controller.Options) error { name := "packages/" + strings.ToLower(v1.ProviderRevisionGroupKind) nr := func() v1.PackageRevision { return &v1.ProviderRevision{} } clientset, err := kubernetes.NewForConfig(mgr.GetConfig()) if err != nil { return errors.Wrap(err, errCannotInitializeHostClientSet) } metaScheme, err := xpkg.BuildMetaScheme() if err != nil { return errors.New(errCannotBuildMetaSchema) } objScheme, err := xpkg.BuildObjectScheme() if err != nil { return errors.New(errCannotBuildObjectSchema) } fetcher, err := xpkg.NewK8sFetcher(clientset, append(o.FetcherOptions, xpkg.WithNamespace(o.Namespace), xpkg.WithServiceAccount(o.ServiceAccount))...) if err != nil { return errors.Wrap(err, errCannotBuildFetcher) } log := o.Logger.WithValues("controller", name) cb := ctrl.NewControllerManagedBy(mgr). Named(name). For(&v1.ProviderRevision{}). Owns(&corev1.Secret{}). // Watch secret changes to react if pull or cert secrets change. Watches(&v1beta1.Lock{}, EnqueuePackageRevisionsForLock(mgr.GetClient(), &v1.ProviderRevisionList{}, log)). Watches(&v1beta1.ImageConfig{}, EnqueuePackageRevisionsForImageConfig(mgr.GetClient(), &v1.ProviderRevisionList{}, log)) ro := []ReconcilerOption{ WithCache(o.Cache), WithDependencyManager(NewPackageDependencyManager(mgr.GetClient(), dag.NewMapDag, v1.ProviderGroupVersionKind, log)), WithEstablisher(NewAPIEstablisher(mgr.GetClient(), o.Namespace, o.MaxConcurrentPackageEstablishers)), WithNewPackageRevisionFn(nr), WithParser(parser.New(metaScheme, objScheme)), WithParserBackend(NewImageBackend(fetcher, WithDefaultRegistry(o.DefaultRegistry))), WithConfigStore(xpkg.NewImageConfigStore(mgr.GetClient(), o.Namespace)), WithLinter(xpkg.NewProviderLinter()), WithLogger(log), WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), WithNamespace(o.Namespace), WithServiceAccount(o.ServiceAccount), WithFeatureFlags(o.Features), } return cb.WithOptions(o.ForControllerRuntime()). Complete(ratelimiter.NewReconciler(name, errors.WithSilentRequeueOnConflict(NewReconciler(mgr, ro...)), o.GlobalRateLimiter)) } // SetupConfigurationRevision adds a controller that reconciles ConfigurationRevisions. func SetupConfigurationRevision(mgr ctrl.Manager, o controller.Options) error { name := "packages/" + strings.ToLower(v1.ConfigurationRevisionGroupKind) nr := func() v1.PackageRevision { return &v1.ConfigurationRevision{} } cs, err := kubernetes.NewForConfig(mgr.GetConfig()) if err != nil { return errors.Wrap(err, errCannotInitializeHostClientSet) } metaScheme, err := xpkg.BuildMetaScheme() if err != nil { return errors.New(errCannotBuildMetaSchema) } objScheme, err := xpkg.BuildObjectScheme() if err != nil { return errors.New(errCannotBuildObjectSchema) } f, err := xpkg.NewK8sFetcher(cs, append(o.FetcherOptions, xpkg.WithNamespace(o.Namespace), xpkg.WithServiceAccount(o.ServiceAccount))...) if err != nil { return errors.Wrap(err, errCannotBuildFetcher) } log := o.Logger.WithValues("controller", name) r := NewReconciler(mgr, WithCache(o.Cache), WithDependencyManager(NewPackageDependencyManager(mgr.GetClient(), dag.NewMapDag, v1.ConfigurationGroupVersionKind, log)), WithNewPackageRevisionFn(nr), WithEstablisher(NewAPIEstablisher(mgr.GetClient(), o.Namespace, o.MaxConcurrentPackageEstablishers)), WithParser(parser.New(metaScheme, objScheme)), WithParserBackend(NewImageBackend(f, WithDefaultRegistry(o.DefaultRegistry))), WithConfigStore(xpkg.NewImageConfigStore(mgr.GetClient(), o.Namespace)), WithLinter(xpkg.NewConfigurationLinter()), WithLogger(log), WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), WithNamespace(o.Namespace), WithServiceAccount(o.ServiceAccount), WithFeatureFlags(o.Features), ) return ctrl.NewControllerManagedBy(mgr). Named(name). For(&v1.ConfigurationRevision{}). Watches(&v1beta1.Lock{}, EnqueuePackageRevisionsForLock(mgr.GetClient(), &v1.ConfigurationRevisionList{}, log)). Watches(&v1beta1.ImageConfig{}, EnqueuePackageRevisionsForImageConfig(mgr.GetClient(), &v1.ConfigurationRevisionList{}, log)). WithOptions(o.ForControllerRuntime()). Complete(ratelimiter.NewReconciler(name, errors.WithSilentRequeueOnConflict(r), o.GlobalRateLimiter)) } // SetupFunctionRevision adds a controller that reconciles FunctionRevisions. func SetupFunctionRevision(mgr ctrl.Manager, o controller.Options) error { name := "packages/" + strings.ToLower(v1.FunctionRevisionGroupKind) nr := func() v1.PackageRevision { return &v1.FunctionRevision{} } clientset, err := kubernetes.NewForConfig(mgr.GetConfig()) if err != nil { return errors.Wrap(err, errCannotInitializeHostClientSet) } metaScheme, err := xpkg.BuildMetaScheme() if err != nil { return errors.New(errCannotBuildMetaSchema) } objScheme, err := xpkg.BuildObjectScheme() if err != nil { return errors.New(errCannotBuildObjectSchema) } fetcher, err := xpkg.NewK8sFetcher(clientset, append(o.FetcherOptions, xpkg.WithNamespace(o.Namespace), xpkg.WithServiceAccount(o.ServiceAccount))...) if err != nil { return errors.Wrap(err, errCannotBuildFetcher) } log := o.Logger.WithValues("controller", name) cb := ctrl.NewControllerManagedBy(mgr). Named(name). For(&v1.FunctionRevision{}). Owns(&corev1.Secret{}). // Watch secret changes to react if pull or cert secrets change. Watches(&v1beta1.Lock{}, EnqueuePackageRevisionsForLock(mgr.GetClient(), &v1.FunctionRevisionList{}, log)). Watches(&v1beta1.ImageConfig{}, EnqueuePackageRevisionsForImageConfig(mgr.GetClient(), &v1.FunctionRevisionList{}, log)) ro := []ReconcilerOption{ WithCache(o.Cache), WithDependencyManager(NewPackageDependencyManager(mgr.GetClient(), dag.NewMapDag, v1.FunctionGroupVersionKind, log)), WithEstablisher(NewAPIEstablisher(mgr.GetClient(), o.Namespace, o.MaxConcurrentPackageEstablishers)), WithNewPackageRevisionFn(nr), WithParser(parser.New(metaScheme, objScheme)), WithParserBackend(NewImageBackend(fetcher, WithDefaultRegistry(o.DefaultRegistry))), WithConfigStore(xpkg.NewImageConfigStore(mgr.GetClient(), o.Namespace)), WithLinter(xpkg.NewFunctionLinter()), WithLogger(log), WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), WithNamespace(o.Namespace), WithServiceAccount(o.ServiceAccount), WithFeatureFlags(o.Features), } return cb.WithOptions(o.ForControllerRuntime()). Complete(ratelimiter.NewReconciler(name, errors.WithSilentRequeueOnConflict(NewReconciler(mgr, ro...)), o.GlobalRateLimiter)) } // NewReconciler creates a new package revision reconciler. func NewReconciler(mgr manager.Manager, opts ...ReconcilerOption) *Reconciler { r := &Reconciler{ client: mgr.GetClient(), cache: xpkg.NewNopCache(), revision: resource.NewAPIFinalizer(mgr.GetClient(), finalizer), objects: NewNopEstablisher(), parser: parser.New(nil, nil), linter: parser.NewPackageLinter(nil, nil, nil), versioner: version.New(), log: logging.NewNopLogger(), record: event.NewNopRecorder(), conditions: conditions.ObservedGenerationPropagationManager{}, } for _, f := range opts { f(r) } return r } // Reconcile package revision. func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { //nolint:gocognit // Reconcilers are often very complex. log := r.log.WithValues("request", req) log.Debug("Reconciling") ctx, cancel := context.WithTimeout(ctx, reconcileTimeout) defer cancel() pr := r.newPackageRevision() if err := r.client.Get(ctx, req.NamespacedName, pr); err != nil { // There's no need to requeue if we no longer exist. Otherwise // we'll be requeued implicitly because we return an error. log.Debug(errGetPackageRevision, "error", err) return reconcile.Result{}, errors.Wrap(resource.IgnoreNotFound(err), errGetPackageRevision) } status := r.conditions.For(pr) log = log.WithValues( "uid", pr.GetUID(), "version", pr.GetResourceVersion(), "name", pr.GetName(), ) // Check the pause annotation and return if it has the value "true" // after logging, publishing an event and updating the SYNC status condition if meta.IsPaused(pr) { r.record.Event(pr, event.Normal(reasonPaused, reconcilePausedMsg)) status.MarkConditions(xpv1.ReconcilePaused().WithMessage(reconcilePausedMsg)) // If the pause annotation is removed, we will have a chance to reconcile again and resume // and if status update fails, we will reconcile again to retry to update the status return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, pr), errUpdateStatus) } if meta.WasDeleted(pr) { // NOTE(hasheddan): In the event that a pre-cached package was // used for this revision, delete will not remove the pre-cached // package image from the cache unless it has the same name as // the provider revision. Delete will not return an error so we // will remove finalizer and leave the image in the cache. if err := r.cache.Delete(pr.GetName()); err != nil { err = errors.Wrap(err, errDeleteCache) r.record.Event(pr, event.Warning(reasonSync, err)) return reconcile.Result{}, err } // NOTE(hasheddan): if we were previously marked as inactive, we // likely already removed self. If we skipped dependency // resolution, we will not be present in the lock. if err := r.lock.RemoveSelf(ctx, pr); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errRemoveLock) r.record.Event(pr, event.Warning(reasonSync, err)) return reconcile.Result{}, err } // Note(turkenh): During the deletion of an active package revision, // we don't need to run relinquish step since when the parent objects // (i.e. Package Revision) is gone, the controller reference on the // child objects (i.e. CRD) will be garbage collected. // We don't need to run the deactivate runtimeHook either since the owned // Deployment or similar objects will be garbage collected as well. if err := r.revision.RemoveFinalizer(ctx, pr); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errRemoveFinalizer) r.record.Event(pr, event.Warning(reasonSync, err)) return reconcile.Result{}, err } return reconcile.Result{Requeue: false}, nil } if c := pr.GetCondition(xpv1.ReconcilePaused().Type); c.Reason == xpv1.ReconcilePaused().Reason { pr.CleanConditions() // Persist the removal of conditions and return. We'll be requeued // with the updated status and resume reconciliation. return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, pr), errUpdateStatus) } // Rewrite the image path if necessary. We need to do this before looking // for pull secrets, since the rewritten path may use different secrets than // the original. imagePath := pr.GetSource() rewriteConfigName, newPath, err := r.config.RewritePath(ctx, imagePath) if err != nil { err = errors.Wrap(err, errRewriteImage) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, pr) r.record.Event(pr, event.Warning(reasonImageConfig, err)) return reconcile.Result{}, err } if newPath != "" { imagePath = newPath pr.SetAppliedImageConfigRefs(v1.ImageConfigRef{ Name: rewriteConfigName, Reason: v1.ImageConfigReasonRewrite, }) } else { pr.ClearAppliedImageConfigRef(v1.ImageConfigReasonRewrite) } // Ensure the rewritten image path is persisted before we proceed. if pr.GetResolvedSource() != imagePath { pr.SetResolvedSource(imagePath) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, pr), errUpdateStatus) } if r.features.Enabled(features.EnableAlphaSignatureVerification) { // Wait for signature verification to complete before proceeding. if cond := pr.GetCondition(v1.TypeVerified); cond.Status != corev1.ConditionTrue { log.Debug("Waiting for signature verification controller to complete verification.", "condition", cond) // Initialize the revision healthy condition if they are not already // set to communicate the status of the revision. if pr.GetCondition(v1.TypeRevisionHealthy).Status == corev1.ConditionUnknown { status.MarkConditions(v1.AwaitingVerification()) return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, pr), "cannot update status with awaiting verification") } return reconcile.Result{}, nil } } if err := r.revision.AddFinalizer(ctx, pr); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errAddFinalizer) r.record.Event(pr, event.Warning(reasonSync, err)) return reconcile.Result{}, err } pullSecretConfig, pullSecretFromConfig, err := r.config.PullSecretFor(ctx, pr.GetResolvedSource()) if err != nil { err = errors.Wrap(err, errGetPullConfig) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, pr) r.record.Event(pr, event.Warning(reasonImageConfig, err)) return reconcile.Result{}, err } // Determine the desired pull secret config ref state var psRef *v1.ImageConfigRef if pullSecretConfig != "" { psRef = &v1.ImageConfigRef{ Name: pullSecretConfig, Reason: v1.ImageConfigReasonSetPullSecret, } } // Check if the current applied image config ref for pull secret needs updating curr := getCurrentImageConfigRef(pr, v1.ImageConfigReasonSetPullSecret) if !imageConfigRefsEqual(curr, psRef) { // Update the applied image config refs and persist immediately pr.ClearAppliedImageConfigRef(v1.ImageConfigReasonSetPullSecret) if psRef != nil { log.Debug("Selected pull secret from image config store", "image", pr.GetResolvedSource(), "imageConfig", pullSecretConfig, "pullSecret", pullSecretFromConfig, "rewriteConfig", rewriteConfigName) r.record.Event(pr, event.Normal(reasonImageConfig, fmt.Sprintf("Selected pullSecret %q from ImageConfig %q for registry authentication", pullSecretFromConfig, pullSecretConfig))) pr.SetAppliedImageConfigRefs(*psRef) } return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, pr), errUpdateStatus) } // Deactivate revision if it is inactive. if pr.GetDesiredState() == v1.PackageRevisionInactive { if err := r.deactivateRevision(ctx, pr); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errDeactivateRevision) r.record.Event(pr, event.Warning(reasonDeactivate, err)) return reconcile.Result{}, err } if len(pr.GetObjects()) > 0 { // Note(turkenh): If the revision is inactive we don't need to // fetch/parse the package again, so we can report success and return // here. The only exception is that revision NOT having references // to the objects that it owns, i.e. status.objectRefs is empty. // This could happen in one of the following two ways: // 1. The revision created as inactive, i.e. was never active before // which could be possible if package installed with // `revisionActivationPolicy` as `Manual`. // 2. The status of the revision got lost, e.g. the status subresource // is not properly restored after a backup/restore operation. // So, we report success and return here iff the revision is inactive // and it has references to the objects that it owns. // Otherwise, we continue with fetching/parsing the package which // would trigger another reconcile after setting object references // in the status where we finalize the deactivation by transitioning // from "controller" to "owner" on owned resources. // We still want to call r.deactivateRevision() above, i.e. even // status.objectRefs is empty, to make sure that the revision is // removed from the lock which could otherwise block a successful // reconciliation. if pr.GetCondition(v1.TypeRevisionHealthy).Status != corev1.ConditionTrue { // NOTE(phisco): We don't want to spam the user with events if the // package revision is already healthy. r.record.Event(pr, event.Normal(reasonSync, "Successfully reconciled package revision")) } status.MarkConditions(v1.RevisionHealthy()) return reconcile.Result{Requeue: false}, errors.Wrap(r.client.Status().Update(ctx, pr), errUpdateStatus) } } // NOTE(negz): There are a bunch of cases below where we ignore errors // returned while updating our status to reflect that our revision is // unhealthy (or of unknown health). This is because: // // 1. We prefer to return the 'original' underlying error. // 2. We'll requeue and try the status update again if needed. // 3. There's little else we could do about it apart from log. pullPolicyNever := false id := pr.GetName() // If packagePullPolicy is Never, the identifier is the package source and // contents must be in the cache. if pr.GetPackagePullPolicy() != nil && *pr.GetPackagePullPolicy() == corev1.PullNever { pullPolicyNever = true id = pr.GetSource() } var rc io.ReadCloser cacheWrite := make(chan error) if r.cache.Has(id) { var err error rc, err = r.cache.Get(id) if err != nil { // If package contents are in the cache, but we cannot access them, // we clear them and try again. _ = r.cache.Delete(id) err = errors.Wrap(err, errGetCache) r.record.Event(pr, event.Warning(reasonParse, err)) return reconcile.Result{}, err } // If we got content from cache we don't need to wait for it to be // written. close(cacheWrite) } // packagePullPolicy is Never and contents are not in the cache so we return // an error. if rc == nil && pullPolicyNever { err := errors.New(errPullPolicyNever) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, pr) r.record.Event(pr, event.Warning(reasonParse, err)) return reconcile.Result{}, err } // If we didn't get a ReadCloser from cache, we need to get it from image. if rc == nil { bo := []parser.BackendOption{PackageRevision(pr)} if pullSecretConfig != "" { bo = append(bo, PullSecretFromConfig(pullSecretFromConfig)) } // Initialize parser backend to obtain package contents. imgrc, err := r.backend.Init(ctx, bo...) if err != nil { err = errors.Wrap(err, errInitParserBackend) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, pr) r.record.Event(pr, event.Warning(reasonParse, err)) // Requeue because we may be waiting for parent package // controller to recreate Pod. return reconcile.Result{}, err } // Package is not in cache, so we write it to the cache while parsing. pipeR, pipeW := io.Pipe() rc = xpkg.TeeReadCloser(imgrc, pipeW) go func() { defer pipeR.Close() //nolint:errcheck // Not much we can do if this fails. if err := r.cache.Store(pr.GetName(), pipeR); err != nil { _ = pipeR.CloseWithError(err) cacheWrite <- err return } close(cacheWrite) }() } // Parse package contents. pkg, err := r.parser.Parse(ctx, struct { io.Reader io.Closer }{ Reader: io.LimitReader(rc, maxPackageSize), Closer: rc, }) // Wait until we finish writing to cache. Parser closes the reader. if err := <-cacheWrite; err != nil { // If we failed to cache we want to cleanup, but we don't abort unless // parsing also failed. Subsequent reconciles will pull image again and // attempt to cache. if err := r.cache.Delete(id); err != nil { log.Debug(errDeleteCache, "error", err) } } if err != nil { err = errors.Wrap(err, errParsePackage) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, pr) r.record.Event(pr, event.Warning(reasonParse, err)) return reconcile.Result{}, err } // Lint package using package-specific linter. if err := r.linter.Lint(pkg); err != nil { err = errors.Wrap(err, errLintPackage) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, pr) r.record.Event(pr, event.Warning(reasonLint, err)) // NOTE(hasheddan): a failed lint typically will require manual // intervention, but on the off chance that we read pod logs // early, which caused a linting failure, we will requeue by // returning an error. return reconcile.Result{}, err } // NOTE(hasheddan): the linter should check this property already, but // if a consumer forgets to pass an option to guarantee one meta object, // we check here to avoid a potential panic on 0 index below. if len(pkg.GetMeta()) != 1 { err = errors.New(errNotOneMeta) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, pr) r.record.Event(pr, event.Warning(reasonLint, err)) return reconcile.Result{}, err } pkgMeta, _ := xpkg.TryConvertToPkg(pkg.GetMeta()[0], &pkgmetav1.Provider{}, &pkgmetav1.Configuration{}, &pkgmetav1.Function{}) meta.AddLabels(pr, pkgMeta.GetLabels()) meta.AddAnnotations(pr, pkgMeta.GetAnnotations()) if err := r.client.Update(ctx, pr); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errUpdateMeta) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, pr) r.record.Event(pr, event.Warning(reasonSync, err)) return reconcile.Result{}, err } // Check Crossplane constraints if they exist. if pr.GetIgnoreCrossplaneConstraints() == nil || !*pr.GetIgnoreCrossplaneConstraints() { if err := xpkg.PackageCrossplaneCompatible(r.versioner)(pkgMeta); err != nil { err = errors.Wrap(err, errIncompatible) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) r.record.Event(pr, event.Warning(reasonLint, err)) // No need to requeue if outside version constraints. // Package will either need to be updated or ignore // crossplane constraints will need to be specified, // both of which will trigger a new reconcile. return reconcile.Result{Requeue: false}, errors.Wrap(r.client.Status().Update(ctx, pr), errUpdateStatus) } } // Check status of package dependencies unless package specifies to skip // resolution. if pr.GetSkipDependencyResolution() != nil && !*pr.GetSkipDependencyResolution() { found, installed, invalid, err := r.lock.Resolve(ctx, pkgMeta, pr) pr.SetDependencyStatus(int64(found), int64(installed), int64(invalid)) if err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errResolveDeps) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, pr) r.record.Event(pr, event.Warning(reasonDependencies, err)) return reconcile.Result{}, err } } // Establish control or ownership of objects. refs, err := r.objects.Establish(ctx, pkg.GetObjects(), pr, pr.GetDesiredState() == v1.PackageRevisionActive) if err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errEstablishControl) status.MarkConditions(v1.RevisionUnhealthy().WithMessage(err.Error())) _ = r.client.Status().Update(ctx, pr) r.record.Event(pr, event.Warning(reasonSync, err)) return reconcile.Result{}, err } // Update object list in package revision status with objects for which // ownership or control has been established. // NOTE(hasheddan): we avoid the overhead of performing a stable sort here // as we are not concerned with preserving the existing ordering of the // slice, but rather the existing references in the status of the package // revision. We should also not have equivalent references in the slice, but // a poorly formed, but still valid package could contain duplicates. // However, in that case the references would be identical (including UUID), // so unstable sort order would not cause a diff in the package revision // status. // See https://github.com/crossplane/crossplane/issues/3466 for tracking // restricting duplicate resources in packages. sort.Slice(refs, func(i, j int) bool { return uniqueResourceIdentifier(refs[i]) > uniqueResourceIdentifier(refs[j]) }) pr.SetObjects(refs) if pr.GetCondition(v1.TypeRevisionHealthy).Status != corev1.ConditionTrue { // NOTE(phisco): We don't want to spam the user with events if the // package revision is already healthy. r.record.Event(pr, event.Normal(reasonSync, "Successfully reconciled package revision")) } status.MarkConditions(v1.RevisionHealthy()) return reconcile.Result{Requeue: false}, errors.Wrap(r.client.Status().Update(ctx, pr), errUpdateStatus) } func (r *Reconciler) deactivateRevision(ctx context.Context, pr v1.PackageRevision) error { // Remove self from the lock if we are present. if err := r.lock.RemoveSelf(ctx, pr); err != nil { return errors.Wrap(err, errRemoveLock) } // ReleaseObjects control of objects. if err := r.objects.ReleaseObjects(ctx, pr); err != nil { return errors.Wrap(err, errReleaseObjects) } return nil } // uniqueResourceIdentifier returns a unique identifier for a resource in a // package, consisting of the group, version, kind, and name. func uniqueResourceIdentifier(ref xpv1.TypedReference) string { return strings.Join([]string{ref.GroupVersionKind().String(), ref.Name}, "/") } // getCurrentImageConfigRef returns the current applied image config ref with the given reason, // or nil if none exists. func getCurrentImageConfigRef(pr v1.PackageRevision, reason v1.ImageConfigRefReason) *v1.ImageConfigRef { for _, ref := range pr.GetAppliedImageConfigRefs() { if ref.Reason == reason { return &ref } } return nil } // imageConfigRefsEqual compares two image config refs for equality. // Both can be nil, which represents the absence of a ref. func imageConfigRefsEqual(a, b *v1.ImageConfigRef) bool { if a == nil && b == nil { return true } if a == nil || b == nil { return false } return a.Name == b.Name && a.Reason == b.Reason }
/* Copyright 2021 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package revision import ( "context" "fmt" "strings" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/crossplane/crossplane-runtime/pkg/logging" v1 "github.com/crossplane/crossplane/apis/pkg/v1" "github.com/crossplane/crossplane/apis/pkg/v1beta1" ) // EnqueuePackageRevisionsForImageConfig enqueues a reconcile for all package // revisions an ImageConfig applies to. func EnqueuePackageRevisionsForImageConfig(kube client.Client, l v1.PackageRevisionList, log logging.Logger) handler.EventHandler { return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { ic, ok := o.(*v1beta1.ImageConfig) if !ok { return nil } // We only care about ImageConfigs that have a pull secret. if ic.Spec.Registry == nil || ic.Spec.Registry.Authentication == nil || ic.Spec.Registry.Authentication.PullSecretRef.Name == "" { return nil } // Enqueue all package revisions matching the prefixes in the ImageConfig. rl := l.DeepCopyObject().(v1.PackageRevisionList) //nolint:forcetypeassert // Guaranteed to be PackageRevisionList. if err := kube.List(ctx, rl); err != nil { // Nothing we can do, except logging, if we can't list // package revisions. log.Debug("Cannot list package revisions while attempting to enqueue from ImageConfig", "error", err) return nil } var matches []reconcile.Request for _, rev := range rl.GetRevisions() { for _, m := range ic.Spec.MatchImages { if strings.HasPrefix(rev.GetSource(), m.Prefix) || strings.HasPrefix(rev.GetResolvedSource(), m.Prefix) { log.Debug("Enqueuing for image config", "revision-type", fmt.Sprintf("%T", rev), "revision-name", rev.GetName(), "image-config-name", ic.GetName()) matches = append(matches, reconcile.Request{NamespacedName: types.NamespacedName{Name: rev.GetName()}}) } } } return matches }) } // EnqueuePackageRevisionsForLock enqueues a reconcile for all package // revisions when the Lock changes. func EnqueuePackageRevisionsForLock(kube client.Client, l v1.PackageRevisionList, log logging.Logger) handler.EventHandler { return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { if _, ok := o.(*v1beta1.Lock); !ok { return nil } rl := l.DeepCopyObject().(v1.PackageRevisionList) //nolint:forcetypeassert // Guaranteed to be PackageRevisionList. if err := kube.List(ctx, rl); err != nil { // Nothing we can do, except logging, if we can't list FunctionRevisions. log.Debug("Cannot list package revisions while attempting to enqueue from Lock", "error", err) return nil } var matches []reconcile.Request for _, rev := range rl.GetRevisions() { matches = append(matches, reconcile.Request{NamespacedName: types.NamespacedName{Name: rev.GetName()}}) } return matches }) }
/* Copyright 2023 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package roles import ( "testing" fuzz "github.com/AdaLogics/go-fuzz-headers" v1 "github.com/crossplane/crossplane/apis/pkg/v1" ) func FuzzRenderClusterRoles(f *testing.F) { f.Fuzz(func(_ *testing.T, data []byte) { ff := fuzz.NewConsumer(data) pr := &v1.ProviderRevision{} ff.GenerateStruct(pr) rs := make([]Resource, 0) ff.CreateSlice(&rs) if len(rs) == 0 { return } _ = RenderClusterRoles(pr, rs) }) }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package roles implements the RBAC manager's support for providers. package roles import ( "context" "fmt" "strings" "time" "github.com/google/go-cmp/cmp" "github.com/google/go-containerregistry/pkg/name" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane/crossplane-runtime/pkg/resource" v1 "github.com/crossplane/crossplane/apis/pkg/v1" "github.com/crossplane/crossplane/internal/controller/rbac/controller" ) const ( timeout = 2 * time.Minute errGetPR = "cannot get ProviderRevision" errListPRs = "cannot list ProviderRevisions" errApplyRole = "cannot apply ClusterRole" ) // Event reasons. const ( reasonApplyRoles event.Reason = "ApplyClusterRoles" ) // A ClusterRoleRenderer renders ClusterRoles for the given resources. type ClusterRoleRenderer interface { // RenderClusterRoles for the supplied resources. RenderClusterRoles(pr *v1.ProviderRevision, rs []Resource) []rbacv1.ClusterRole } // A ClusterRoleRenderFn renders ClusterRoles for the supplied resources. type ClusterRoleRenderFn func(pr *v1.ProviderRevision, rs []Resource) []rbacv1.ClusterRole // RenderClusterRoles renders ClusterRoles for the supplied CRDs. func (fn ClusterRoleRenderFn) RenderClusterRoles(pr *v1.ProviderRevision, rs []Resource) []rbacv1.ClusterRole { return fn(pr, rs) } // Setup adds a controller that reconciles a ProviderRevision by creating a // series of opinionated ClusterRoles that may be bound to allow access to the // resources it defines. func Setup(mgr ctrl.Manager, o controller.Options) error { name := "rbac-roles/" + strings.ToLower(v1.ProviderRevisionGroupKind) if o.AllowClusterRole == "" { r := NewReconciler(mgr, WithLogger(o.Logger.WithValues("controller", name)), WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name)))) return ctrl.NewControllerManagedBy(mgr). Named(name). For(&v1.ProviderRevision{}). Owns(&rbacv1.ClusterRole{}). WithOptions(o.ForControllerRuntime()). Complete(ratelimiter.NewReconciler(name, errors.WithSilentRequeueOnConflict(r), o.GlobalRateLimiter)) } sfh := &EnqueueRequestForAllRevisionsInFamily{ client: mgr.GetClient(), } r := NewReconciler(mgr, WithLogger(o.Logger.WithValues("controller", name)), WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), WithOrgDiffer(OrgDiffer{DefaultRegistry: o.DefaultRegistry})) return ctrl.NewControllerManagedBy(mgr). Named(name). For(&v1.ProviderRevision{}). Owns(&rbacv1.ClusterRole{}). Watches(&v1.ProviderRevision{}, sfh). WithOptions(o.ForControllerRuntime()). Complete(ratelimiter.NewReconciler(name, errors.WithSilentRequeueOnConflict(r), o.GlobalRateLimiter)) } // ReconcilerOption is used to configure the Reconciler. type ReconcilerOption func(*Reconciler) // WithLogger specifies how the Reconciler should log messages. func WithLogger(log logging.Logger) ReconcilerOption { return func(r *Reconciler) { r.log = log } } // WithRecorder specifies how the Reconciler should record Kubernetes events. func WithRecorder(er event.Recorder) ReconcilerOption { return func(r *Reconciler) { r.record = er } } // WithClientApplicator specifies how the Reconciler should interact with the // Kubernetes API. func WithClientApplicator(ca resource.ClientApplicator) ReconcilerOption { return func(r *Reconciler) { r.client = ca } } // WithClusterRoleRenderer specifies how the Reconciler should render RBAC // ClusterRoles. func WithClusterRoleRenderer(rr ClusterRoleRenderer) ReconcilerOption { return func(r *Reconciler) { r.rbac.ClusterRoleRenderer = rr } } // WithOrgDiffer specifies how the Reconciler should diff OCI orgs. It does this // to ensure that two providers may only be part of the same family if they're // in the same OCI org. func WithOrgDiffer(d OrgDiffer) ReconcilerOption { return func(r *Reconciler) { r.org = d } } // NewReconciler returns a Reconciler of ProviderRevisions. func NewReconciler(mgr manager.Manager, opts ...ReconcilerOption) *Reconciler { r := &Reconciler{ // TODO(negz): Is Updating appropriate here? Probably. client: resource.ClientApplicator{ Client: mgr.GetClient(), Applicator: resource.NewAPIUpdatingApplicator(mgr.GetClient()), }, rbac: rbac{ ClusterRoleRenderer: ClusterRoleRenderFn(RenderClusterRoles), }, log: logging.NewNopLogger(), record: event.NewNopRecorder(), } for _, f := range opts { f(r) } return r } type rbac struct { ClusterRoleRenderer } // A Reconciler reconciles ProviderRevisions. type Reconciler struct { client resource.ClientApplicator rbac rbac org OrgDiffer log logging.Logger record event.Recorder } // Reconcile a ProviderRevision by creating a series of opinionated ClusterRoles // that may be bound to allow access to the resources it defines. func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := r.log.WithValues("request", req) log.Debug("Reconciling") ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() pr := &v1.ProviderRevision{} if err := r.client.Get(ctx, req.NamespacedName, pr); err != nil { // In case object is not found, most likely the object was deleted and // then disappeared while the event was in the processing queue. We // don't need to take any action in that case. log.Debug(errGetPR, "error", err) return reconcile.Result{}, errors.Wrap(resource.IgnoreNotFound(err), errGetPR) } log = log.WithValues( "uid", pr.GetUID(), "version", pr.GetResourceVersion(), "name", pr.GetName(), ) // Check the pause annotation and return if it has the value "true" // after logging, publishing an event and updating the SYNC status condition if meta.IsPaused(pr) { return reconcile.Result{}, nil } if meta.WasDeleted(pr) { // There's nothing to do if our PR is being deleted. Any ClusterRoles // we created will be garbage collected by Kubernetes. return reconcile.Result{Requeue: false}, nil } resources := DefinedResources(pr.Status.ObjectRefs) // If this revision is part of a provider family we consider it to 'own' all // of the family's CRDs (despite it not actually being an owner reference). // This allows the revision to use core types installed by another provider, // e.g. ProviderConfigs. It also allows the provider to cross-resource // reference resources from other providers within its family. // // TODO(negz): Once generic cross-resource references are implemented we can // reduce this to only allowing access to core types, like ProviderConfig. // https://github.com/crossplane/crossplane/issues/1770 if family := pr.GetLabels()[v1.LabelProviderFamily]; family != "" { // TODO(negz): Get active revisions in family. prs := &v1.ProviderRevisionList{} if err := r.client.List(ctx, prs, client.MatchingLabels{v1.LabelProviderFamily: family}); err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errListPRs) r.record.Event(pr, event.Warning(reasonApplyRoles, err)) return reconcile.Result{}, err } // TODO(negz): Should we filter down to only active revisions? I don't // think there's any benefit. If the revision is inactive and never // created any CRDs there will be no CRDs to grant permissions for. If // it's inactive but did create (or would share) CRDs then this provider // might try use them, and we should let it. for _, member := range prs.Items { // We already added our own resources. if member.GetUID() == pr.GetUID() { continue } // We only allow packages in the same OCI registry and org to be // part of the same family. This prevents a malicious provider from // declaring itself part of a family and thus being granted RBAC // access to its types. // TODO(negz): Consider using package signing here in future. if r.org.Differs(pr.Spec.Package, member.Spec.Package) { continue } resources = append(resources, DefinedResources(member.Status.ObjectRefs)...) } } applied := make([]string, 0) for _, cr := range r.rbac.RenderClusterRoles(pr, resources) { log := log.WithValues("role-name", cr.GetName()) origRV := "" err := r.client.Apply(ctx, &cr, resource.MustBeControllableBy(pr.GetUID()), resource.AllowUpdateIf(ClusterRolesDiffer), resource.StoreCurrentRV(&origRV), ) if resource.IsNotAllowed(err) { log.Debug("Skipped no-op RBAC ClusterRole apply") continue } if err != nil { if kerrors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } err = errors.Wrap(err, errApplyRole) r.record.Event(pr, event.Warning(reasonApplyRoles, err)) return reconcile.Result{}, err } if cr.GetResourceVersion() != origRV { log.Debug("Applied RBAC ClusterRole") applied = append(applied, cr.GetName()) } } if len(applied) > 0 { r.record.Event(pr, event.Normal(reasonApplyRoles, fmt.Sprintf("Applied RBAC ClusterRoles: %s", resource.StableNAndSomeMore(resource.DefaultFirstN, applied)))) } // TODO(negz): Add a condition that indicates the RBAC manager is // managing cluster roles for this ProviderRevision? // There's no need to requeue explicitly - we're watching all PRs. return reconcile.Result{Requeue: false}, nil } // DefinedResources returns the resources defined by the supplied references. func DefinedResources(refs []xpv1.TypedReference) []Resource { out := make([]Resource, 0, len(refs)) for _, ref := range refs { // This would only return an error if the APIVersion contained more than // one "/". This should be impossible, but if it somehow happens we'll // just skip this resource since it can't be a CRD. gv, _ := schema.ParseGroupVersion(ref.APIVersion) // We're only concerned with CRDs. if gv.Group != apiextensions.GroupName || ref.Kind != "CustomResourceDefinition" { continue } p, g, valid := strings.Cut(ref.Name, ".") if !valid { // This shouldn't be possible - CRDs must be named <plural>.<group>. continue } out = append(out, Resource{Group: g, Plural: p}) } return out } // ClusterRolesDiffer returns true if the supplied objects are different // ClusterRoles. We consider ClusterRoles to be different if their labels and // rules do not match. func ClusterRolesDiffer(current, desired runtime.Object) bool { // Calling this with anything but ClusterRoles is a programming error. If it // happens, we probably do want to panic. c := current.(*rbacv1.ClusterRole) //nolint:forcetypeassert // See above. d := desired.(*rbacv1.ClusterRole) //nolint:forcetypeassert // See above. return !cmp.Equal(c.GetLabels(), d.GetLabels()) || !cmp.Equal(c.Rules, d.Rules) } // An OrgDiffer determines whether two references are part of the same org. In // this context we consider an org to consist of: // // - The registry (e.g. xpkg.upbound.io or index.docker.io). // - The part of the repository path before the first slash (e.g. crossplane // in crossplane/provider-aws). type OrgDiffer struct { // The default OCI registry to use when parsing references. DefaultRegistry string } // Differs returns true if the supplied references are not part of the same OCI // registry and org. func (d OrgDiffer) Differs(a, b string) bool { // If we can't parse either reference we can't compare them. Safest thing to // do is to assume they're not part of the same org. ra, err := name.ParseReference(a, name.WithDefaultRegistry(d.DefaultRegistry)) if err != nil { return true } rb, err := name.ParseReference(b, name.WithDefaultRegistry(d.DefaultRegistry)) if err != nil { return true } ca := ra.Context() cb := rb.Context() // If the registries (e.g. xpkg.upbound.io) don't match they're not in the // same org. if ca.RegistryStr() != cb.RegistryStr() { return true } oa := strings.Split(ca.RepositoryStr(), "/")[0] ob := strings.Split(cb.RepositoryStr(), "/")[0] return oa != ob }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package roles import ( "context" "fmt" rbacv1 "k8s.io/api/rbac/v1" ) const ( wildcard = "*" pathPrefixURL = "url" pathPrefixResource = "resource" ) // A path represents a rule within the rule tree. type path []string // A node within the rule tree. Each node indicates whether the path should be // allowed. In practice only leaves (i.e. Verbs) should have this set. type node struct { allowed bool children map[string]*node } // newNode initialises and returns a Node. func newNode() *node { return &node{children: map[string]*node{}} } // Allow the supplied path. func (n *node) Allow(p path) { if len(p) == 0 { n.allowed = true return } k := p[0] if _, ok := n.children[k]; !ok { n.children[k] = newNode() } n.children[k].Allow(p[1:]) } // Allowed returns true if the supplied path is allowed. func (n *node) Allowed(p path) bool { if len(p) == 0 { return false } // TODO(negz): A NonResourceURL can't be a wildcard, but can have a final // segment that is a wildcard; e.g. /apis/*. We don't currently support // this. for _, k := range []string{p[0], wildcard} { if c, ok := n.children[k]; ok { if c.allowed { return true } if c.Allowed(p[1:]) { return true } } } return false } // A Rule represents a single, granular RBAC rule. type Rule struct { // The API group of this resource. The empty string denotes the core // Kubernetes API group. '*' represents any API group. APIGroup string // The resource in question. '*' represents any resource. Resource string // The name of the resource. Unlike the rbacv1 API, we use '*' to represent // any resource name. ResourceName string // A non-resource URL. Mutually exclusive with the above resource fields. NonResourceURL string // The verb this rule allows. Verb string } func (r Rule) String() string { if r.NonResourceURL != "" { return fmt.Sprintf("{nonResourceURL: %q, verb: %q}", r.NonResourceURL, r.Verb) } return fmt.Sprintf("{apiGroup: %q, resource: %q, resourceName: %q, verb: %q}", r.APIGroup, r.Resource, r.ResourceName, r.Verb) } // path of the rule, for use in a tree. func (r Rule) path() path { if r.NonResourceURL != "" { return path{pathPrefixURL, r.NonResourceURL, r.Verb} } return path{pathPrefixResource, r.APIGroup, r.Resource, r.ResourceName, r.Verb} } // Expand RBAC policy rules into our granular rules. func Expand(ctx context.Context, rs ...rbacv1.PolicyRule) ([]Rule, error) { //nolint:gocognit // Granular rules are inherently complex. out := make([]Rule, 0, len(rs)) for _, r := range rs { for _, u := range r.NonResourceURLs { for _, v := range r.Verbs { // exit if ctx is done select { case <-ctx.Done(): return nil, ctx.Err() default: } out = append(out, Rule{NonResourceURL: u, Verb: v}) } } // ResourceNames are somewhat unique in rbacv1.PolicyRule in that no // names means all names. APIGroups and Resources use the wildcard to // represent that. We use the wildcard here too to simplify our logic. names := r.ResourceNames if len(names) < 1 { names = []string{wildcard} } for _, g := range r.APIGroups { for _, rsc := range r.Resources { for _, n := range names { for _, v := range r.Verbs { select { case <-ctx.Done(): return nil, ctx.Err() default: } out = append(out, Rule{APIGroup: g, Resource: rsc, ResourceName: n, Verb: v}) } } } } } return out, nil }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package roles import ( "sort" coordinationv1 "k8s.io/api/coordination/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/crossplane/crossplane-runtime/pkg/meta" v1 "github.com/crossplane/crossplane/apis/pkg/v1" ) const ( namePrefix = "crossplane:provider:" nameSuffixEdit = ":aggregate-to-edit" nameSuffixView = ":aggregate-to-view" nameSuffixSystem = ":system" keyAggregateToCrossplane = "rbac.crossplane.io/aggregate-to-crossplane" keyAggregateToAdmin = "rbac.crossplane.io/aggregate-to-admin" keyAggregateToEdit = "rbac.crossplane.io/aggregate-to-edit" keyAggregateToView = "rbac.crossplane.io/aggregate-to-view" keyProviderName = "rbac.crossplane.io/system" valTrue = "true" suffixStatus = "/status" suffixFinalizers = "/finalizers" pluralEvents = "events" pluralConfigmaps = "configmaps" pluralSecrets = "secrets" pluralLeases = "leases" ) //nolint:gochecknoglobals // We treat these as constants. var ( verbsEdit = []string{rbacv1.VerbAll} verbsView = []string{"get", "list", "watch"} verbsSystem = []string{"get", "list", "watch", "update", "patch", "create"} verbsUpdate = []string{"update"} ) // Extra rules that are granted to all provider pods. // TODO(negz): Should we require providers to ask for these explicitly? The vast // majority of providers will need them: // // * Secrets for provider credentials and connection secrets. // * ConfigMaps for leader election. // * Leases for leader election. // * Events for debugging. // //nolint:gochecknoglobals // We treat this as a constant. var rulesSystemExtra = []rbacv1.PolicyRule{ { APIGroups: []string{"", coordinationv1.GroupName}, Resources: []string{pluralSecrets, pluralConfigmaps, pluralEvents, pluralLeases}, Verbs: verbsEdit, }, } // SystemClusterRoleName returns the name of the 'system' cluster role - i.e. // the role that a provider's ServiceAccount should be bound to. func SystemClusterRoleName(revisionName string) string { return namePrefix + revisionName + nameSuffixSystem } // A Resource is a Kubernetes API resource. type Resource struct { // Group is the unversioned API group of this resource. Group string // Plural is the plural name of this resource. Plural string } // RenderClusterRoles returns ClusterRoles for the supplied ProviderRevision. func RenderClusterRoles(pr *v1.ProviderRevision, rs []Resource) []rbacv1.ClusterRole { // Return early if we have no resources to render roles for. if len(rs) == 0 { return nil } // Our list of resources has no guaranteed order, so we sort them in order // to ensure we don't reorder our RBAC rules on each update. sort.Slice(rs, func(i, j int) bool { return rs[i].Plural+rs[i].Group < rs[j].Plural+rs[j].Group }) groups := make([]string, 0) // Allows deterministic iteration over groups. resources := make(map[string][]string) // Resources by group. for _, r := range rs { if _, ok := resources[r.Group]; !ok { resources[r.Group] = make([]string, 0) groups = append(groups, r.Group) } resources[r.Group] = append(resources[r.Group], r.Plural, r.Plural+suffixStatus) } rules := []rbacv1.PolicyRule{} for _, g := range groups { rules = append(rules, rbacv1.PolicyRule{ APIGroups: []string{g}, Resources: resources[g], }) } // Provider pods may create Kubernetes secrets containing managed resource connection details. // These secrets are controlled (in the owner reference sense) by the managed resource. // Crossplane needs permission to set finalizers on managed resources in order to create secrets // that block their deletion when the OwnerReferencesPermissionEnforcement admission controller is enabled. ruleFinalizers := rbacv1.PolicyRule{ APIGroups: groups, Resources: []string{rbacv1.ResourceAll + suffixFinalizers}, Verbs: verbsUpdate, } edit := &rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{ Name: namePrefix + pr.GetName() + nameSuffixEdit, Labels: map[string]string{ // Edit rules aggregate to the Crossplane ClusterRole too. // Crossplane needs access to reconcile all composite resources // and composite resource claims. keyAggregateToCrossplane: valTrue, // Edit rules aggregate to admin too. Currently edit and admin // differ only in their base roles. keyAggregateToAdmin: valTrue, keyAggregateToEdit: valTrue, }, }, Rules: withVerbs(rules, verbsEdit), } view := &rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{ Name: namePrefix + pr.GetName() + nameSuffixView, Labels: map[string]string{ keyAggregateToView: valTrue, }, }, Rules: withVerbs(rules, verbsView), } // The 'system' RBAC role does not aggregate; it is intended to be bound // directly to the service account tha provider runs as. system := &rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{ Name: SystemClusterRoleName(pr.GetName()), Labels: map[string]string{ keyProviderName: pr.GetName(), }, }, Rules: append(append(withVerbs(rules, verbsSystem), ruleFinalizers), rulesSystemExtra...), } roles := []rbacv1.ClusterRole{*edit, *view, *system} for i := range roles { ref := meta.AsController(meta.TypedReferenceTo(pr, v1.ProviderRevisionGroupVersionKind)) roles[i].SetOwnerReferences([]metav1.OwnerReference{ref}) } return roles } func withVerbs(r []rbacv1.PolicyRule, verbs []string) []rbacv1.PolicyRule { verbal := make([]rbacv1.PolicyRule, len(r)) for i := range r { verbal[i] = r[i] verbal[i].Verbs = verbs } return verbal }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package roles import ( "context" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "github.com/crossplane/crossplane/apis/pkg/v1" ) type adder interface { Add(item reconcile.Request) } // EnqueueRequestForAllRevisionsInFamily enqueues a request for all // provider revisions with the same family as one that changed. type EnqueueRequestForAllRevisionsInFamily struct { client client.Client } // Create enqueues a request for all provider revisions within the same family. func (e *EnqueueRequestForAllRevisionsInFamily) Create(ctx context.Context, evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { e.add(ctx, evt.Object, q) } // Update enqueues a request for all provider revisions within the same family. func (e *EnqueueRequestForAllRevisionsInFamily) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { e.add(ctx, evt.ObjectOld, q) e.add(ctx, evt.ObjectNew, q) } // Delete enqueues a request for all provider revisions within the same family. func (e *EnqueueRequestForAllRevisionsInFamily) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { e.add(ctx, evt.Object, q) } // Generic enqueues a request for all provider revisions within the same family. func (e *EnqueueRequestForAllRevisionsInFamily) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { e.add(ctx, evt.Object, q) } func (e *EnqueueRequestForAllRevisionsInFamily) add(ctx context.Context, obj runtime.Object, queue adder) { pr, ok := obj.(*v1.ProviderRevision) if !ok { return } family := pr.GetLabels()[v1.LabelProviderFamily] if family == "" { // This revision is not part of a family. return } l := &v1.ProviderRevisionList{} if err := e.client.List(ctx, l, client.MatchingLabels{v1.LabelProviderFamily: family}); err != nil { // TODO(negz): Handle this error? return } for _, member := range l.Items { if member.GetUID() == pr.GetUID() { // No need to enqueue a request for the ProviderRevision that // triggered this enqueue. continue } queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: member.GetName()}}) } }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package dag implements a Directed Acyclic Graph for Package dependencies. package dag import ( "github.com/crossplane/crossplane-runtime/pkg/errors" ) // Node is a node in DAG. type Node interface { //nolint:interfacebloat // NOTE(ezgidemirel): Interface is extended to support package version update capability. Identifier() string Neighbors() []Node // GetConstraints Returns the version or constraint of the package. GetConstraints() string // GetParentConstraints Returns the version or constraint of the package which comes from its parents. GetParentConstraints() []string AddParentConstraints(c []string) // Node implementations should be careful to establish uniqueness of // neighbors in their AddNeighbors method or risk counting a neighbor // multiple times. AddNeighbors(ns ...Node) error } // DAG is a Directed Acyclic Graph. type DAG interface { //nolint:interfacebloat // TODO(negz): Could this be several smaller interfaces? Init(ns []Node) ([]Node, error) AddNode(n Node) error AddNodes(ns ...Node) error AddOrUpdateNodes(ns ...Node) GetNode(identifier string) (Node, error) AddEdge(from string, to Node) (bool, error) AddEdges(edges map[string][]Node) ([]Node, error) NodeExists(identifier string) bool NodeNeighbors(identifier string) ([]Node, error) TraceNode(identifier string) (map[string]Node, error) Sort() ([]string, error) } // MapDag is a directed acyclic graph implementation that uses a map for its // underlying data structure. type MapDag struct { nodes map[string]Node } // NewDAGFn is a function that returns a DAG. type NewDAGFn func() DAG // NewMapDag creates a new MapDag. func NewMapDag() DAG { return &MapDag{nodes: map[string]Node{}} } // Init initializes a MapDag and implies missing destination nodes. Any implied // nodes are returned. Any existing nodes are cleared. func (d *MapDag) Init(nodes []Node) ([]Node, error) { d.nodes = map[string]Node{} // Add all nodes before adding edges so we know what nodes were implied. for _, node := range nodes { if err := d.AddNode(node); err != nil { return nil, err } } var implied []Node for _, node := range nodes { miss, err := d.AddEdges(map[string][]Node{ node.Identifier(): node.Neighbors(), }) if err != nil { return nil, err } implied = append(implied, miss...) } return implied, nil } // AddNodes adds nodes to the graph. func (d *MapDag) AddNodes(nodes ...Node) error { for _, n := range nodes { if err := d.AddNode(n); err != nil { return err } } return nil } // AddNode adds a node to the graph. func (d *MapDag) AddNode(node Node) error { if _, ok := d.nodes[node.Identifier()]; ok { return errors.Errorf("node %s already exists", node.Identifier()) } d.nodes[node.Identifier()] = node return nil } // AddOrUpdateNodes adds new nodes or updates the existing ones with the same // identifier. func (d *MapDag) AddOrUpdateNodes(nodes ...Node) { for _, node := range nodes { d.nodes[node.Identifier()] = node } } // NodeExists checks whether a node exists. func (d *MapDag) NodeExists(identifier string) bool { _, exists := d.nodes[identifier] return exists } // NodeNeighbors returns a node's neighbors. func (d *MapDag) NodeNeighbors(identifier string) ([]Node, error) { if _, ok := d.nodes[identifier]; !ok { return nil, errors.Errorf("node %s does not exist", identifier) } return d.nodes[identifier].Neighbors(), nil } // TraceNode returns a node's neighbors and all transitive neighbors using depth // first search. func (d *MapDag) TraceNode(identifier string) (map[string]Node, error) { tree := map[string]Node{} if err := d.traceNode(identifier, tree); err != nil { return nil, err } return tree, nil } func (d *MapDag) traceNode(identifier string, tree map[string]Node) error { if d.nodes[identifier] == nil { return errors.New("missing node in tree") } for _, n := range d.nodes[identifier].Neighbors() { // if we have already visited this neighbor, then we have already // visited its neighbors, so we can skip. if _, ok := tree[n.Identifier()]; ok { continue } tree[n.Identifier()] = n if err := d.traceNode(n.Identifier(), tree); err != nil { return err } } return nil } // GetNode returns a node in the dag. func (d *MapDag) GetNode(identifier string) (Node, error) { if _, ok := d.nodes[identifier]; !ok { return nil, errors.Errorf("node %s does not exist", identifier) } return d.nodes[identifier], nil } // AddEdges adds edges to the graph. func (d *MapDag) AddEdges(edges map[string][]Node) ([]Node, error) { var missing []Node for f, ne := range edges { for _, e := range ne { implied, err := d.AddEdge(f, e) if implied { missing = append(missing, e) } if err != nil { return nil, err } } } return missing, nil } // AddEdge adds an edge to the graph. func (d *MapDag) AddEdge(from string, to Node) (bool, error) { if _, ok := d.nodes[from]; !ok { return false, errors.Errorf("node %s does not exist", to) } implied := false if _, ok := d.nodes[to.Identifier()]; !ok { implied = true if err := d.AddNode(to); err != nil { return implied, err } } return implied, d.nodes[from].AddNeighbors(to) } // Sort performs topological sort on the graph. func (d *MapDag) Sort() ([]string, error) { visited := map[string]bool{} results := make([]string, len(d.nodes)) for n, node := range d.nodes { if !visited[n] { stack := map[string]bool{} if err := d.visit(n, node.Neighbors(), stack, visited, results); err != nil { return nil, err } } } return results, nil } func (d *MapDag) visit(name string, neighbors []Node, stack map[string]bool, visited map[string]bool, results []string) error { visited[name] = true stack[name] = true for _, n := range neighbors { if !visited[n.Identifier()] { if _, ok := d.nodes[n.Identifier()]; !ok { return errors.Errorf("node %q does not exist", n.Identifier()) } if err := d.visit(n.Identifier(), d.nodes[n.Identifier()].Neighbors(), stack, visited, results); err != nil { return err } } else if stack[n.Identifier()] { return errors.Errorf("detected cycle on: %s", n.Identifier()) } } for i, r := range results { if r == "" { results[i] = name break } } stack[name] = false return nil }
/* Copyright 2023 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package dag import ( "errors" "testing" fuzz "github.com/AdaLogics/go-fuzz-headers" ) type SimpleFuzzNode struct { IdentifierString string NeighborsField map[string]SimpleFuzzNode } func toNodesFuzz(n []SimpleFuzzNode) []Node { nodes := make([]Node, len(n)) for i, r := range n { nodes[i] = &r } return nodes } func (s *SimpleFuzzNode) AddNeighbors(nodes ...Node) error { for _, n := range nodes { sn, ok := n.(*SimpleFuzzNode) if !ok { return errors.New("not a simple node") } if s.NeighborsField == nil { s.NeighborsField = make(map[string]SimpleFuzzNode) } s.NeighborsField[sn.Identifier()] = *sn } return nil } func (s *SimpleFuzzNode) Identifier() string { return s.IdentifierString } func (s *SimpleFuzzNode) Neighbors() []Node { nodes := make([]Node, len(s.NeighborsField)) i := 0 for _, r := range s.NeighborsField { nodes[i] = &r i++ } return nodes } func (s *SimpleFuzzNode) GetConstraints() string { return "" } func (s *SimpleFuzzNode) GetParentConstraints() []string { return nil } func (s *SimpleFuzzNode) AddParentConstraints([]string) {} func FuzzDag(f *testing.F) { f.Fuzz(func(_ *testing.T, data []byte) { c := fuzz.NewConsumer(data) nodes := make([]SimpleFuzzNode, 0) err := c.CreateSlice(&nodes) if err != nil { return } for _, n := range nodes { if n.NeighborsField == nil { n.NeighborsField = make(map[string]SimpleFuzzNode) } } d := NewMapDag() _, _ = d.Init(toNodesFuzz(nodes)) identifier, err := c.GetString() if err != nil { return } d.Sort() _, _ = d.TraceNode(identifier) d.Sort() from, err := c.GetString() if err != nil { return } fuzzNode := &SimpleFuzzNode{} c.GenerateStruct(fuzzNode) _, _ = d.AddEdge(from, fuzzNode) d.Sort() d.NodeNeighbors(identifier) }) }
/* Copyright 2024 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package dag implements a Directed Acyclic Graph for Package dependencies. package dag import ( "github.com/Masterminds/semver" "github.com/crossplane/crossplane-runtime/pkg/errors" ) // MapUpgradingDag is a directed acyclic graph implementation that uses a map for its // underlying data structure and has the ability to distinguish upgradable nodes. type MapUpgradingDag struct { nodes map[string]Node } // NewUpgradingDAGFn is a function that returns a DAG. type NewUpgradingDAGFn func() DAG // NewUpgradingMapDag creates a new MapDag. func NewUpgradingMapDag() DAG { return &MapUpgradingDag{nodes: map[string]Node{}} } // Init initializes a MapDag and implies missing destination nodes. Any implied // nodes are returned. Any existing nodes are cleared. func (d *MapUpgradingDag) Init(nodes []Node) ([]Node, error) { d.nodes = map[string]Node{} // Add all nodes before adding edges so we know what nodes were implied. for _, node := range nodes { if err := d.AddNode(node); err != nil { return nil, err } } var implied []Node for _, node := range nodes { miss, err := d.AddEdges(map[string][]Node{ node.Identifier(): node.Neighbors(), }) if err != nil { return nil, err } implied = append(implied, miss...) } return implied, nil } // AddNodes adds nodes to the graph. func (d *MapUpgradingDag) AddNodes(nodes ...Node) error { for _, n := range nodes { if err := d.AddNode(n); err != nil { return err } } return nil } // AddNode adds a node to the graph. func (d *MapUpgradingDag) AddNode(node Node) error { if _, ok := d.nodes[node.Identifier()]; ok { return errors.Errorf("node %s already exists", node.Identifier()) } d.nodes[node.Identifier()] = node return nil } // AddOrUpdateNodes adds new nodes or updates the existing ones with the same // identifier. func (d *MapUpgradingDag) AddOrUpdateNodes(nodes ...Node) { for _, node := range nodes { if _, ok := d.nodes[node.Identifier()]; ok { node.AddParentConstraints(d.nodes[node.Identifier()].GetParentConstraints()) } d.nodes[node.Identifier()] = node } } // NodeExists checks whether a node exists. func (d *MapUpgradingDag) NodeExists(identifier string) bool { _, exists := d.nodes[identifier] return exists } // NodeNeighbors returns a node's neighbors. func (d *MapUpgradingDag) NodeNeighbors(identifier string) ([]Node, error) { if _, ok := d.nodes[identifier]; !ok { return nil, errors.Errorf("node %s does not exist", identifier) } return d.nodes[identifier].Neighbors(), nil } // TraceNode returns a node's neighbors and all transitive neighbors using depth // first search. func (d *MapUpgradingDag) TraceNode(identifier string) (map[string]Node, error) { tree := map[string]Node{} if err := d.traceNode(identifier, tree); err != nil { return nil, err } return tree, nil } func (d *MapUpgradingDag) traceNode(identifier string, tree map[string]Node) error { if d.nodes[identifier] == nil { return errors.New("missing node in tree") } for _, n := range d.nodes[identifier].Neighbors() { // if we have already visited this neighbor, then we have already // visited its neighbors, so we can skip. if _, ok := tree[n.Identifier()]; ok { continue } tree[n.Identifier()] = n if err := d.traceNode(n.Identifier(), tree); err != nil { return err } } return nil } // GetNode returns a node in the dag. func (d *MapUpgradingDag) GetNode(identifier string) (Node, error) { if _, ok := d.nodes[identifier]; !ok { return nil, errors.Errorf("node %s does not exist", identifier) } return d.nodes[identifier], nil } // AddEdges adds edges to the graph. func (d *MapUpgradingDag) AddEdges(edges map[string][]Node) ([]Node, error) { var missing []Node for f, ne := range edges { for _, e := range ne { implied, err := d.AddEdge(f, e) if implied { missing = append(missing, e) } if err != nil { return nil, err } } } return missing, nil } // AddEdge adds an edge to the graph and returns if we need to check for updates. func (d *MapUpgradingDag) AddEdge(from string, to Node) (bool, error) { if _, ok := d.nodes[from]; !ok { return false, errors.Errorf("node %s does not exist", to) } implied := false orgTo, ok := d.nodes[to.Identifier()] if !ok { implied = true if err := d.AddNode(to); err != nil { return implied, err } } else if !isValidConstraints(orgTo, to) { // check if upgrade is needed err := d.nodes[from].AddNeighbors(to) n := d.nodes[to.Identifier()] n.AddParentConstraints(to.GetParentConstraints()) return true, err } err := d.nodes[from].AddNeighbors(to) d.nodes[to.Identifier()].AddParentConstraints(to.GetParentConstraints()) return implied, err } // Sort performs topological sort on the graph. func (d *MapUpgradingDag) Sort() ([]string, error) { visited := map[string]bool{} results := make([]string, len(d.nodes)) for n, node := range d.nodes { if !visited[n] { stack := map[string]bool{} if err := d.visit(n, node.Neighbors(), stack, visited, results); err != nil { return nil, err } } } return results, nil } func (d *MapUpgradingDag) visit(name string, neighbors []Node, stack map[string]bool, visited map[string]bool, results []string) error { visited[name] = true stack[name] = true for _, n := range neighbors { if !visited[n.Identifier()] { if _, ok := d.nodes[n.Identifier()]; !ok { return errors.Errorf("node %q does not exist", n.Identifier()) } if err := d.visit(n.Identifier(), d.nodes[n.Identifier()].Neighbors(), stack, visited, results); err != nil { return err } } else if stack[n.Identifier()] { return errors.Errorf("detected cycle on: %s", n.Identifier()) } } for i, r := range results { if r == "" { results[i] = name break } } stack[name] = false return nil } func isValidConstraints(installed, wanted Node) bool { // NOTE(ezgidemirel): This condition also satisfies digests if installed.GetConstraints() == wanted.GetConstraints() { return true } c, err := semver.NewConstraint(wanted.GetConstraints()) if err != nil { return false } v, err := semver.NewVersion(installed.GetConstraints()) if err != nil { return false } if !c.Check(v) { return false } return true }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package xcrd generates CustomResourceDefinitions from Crossplane definitions. // // v1.JSONSchemaProps is incompatible with controller-tools (as of 0.2.4) // because it is missing JSON tags and uses float64, which is a disallowed type. // We thus copy the entire struct as CRDSpecTemplate. See the below issue: // https://github.com/kubernetes-sigs/controller-tools/issues/291 package xcrd import ( "encoding/json" extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/meta" v1 "github.com/crossplane/crossplane/apis/apiextensions/v1" ) // Category names for generated claim and composite CRDs. const ( CategoryClaim = "claim" CategoryComposite = "composite" ) const ( errFmtGenCrd = "cannot generate CRD for %q %q" errParseValidation = "cannot parse validation schema" errInvalidClaimNames = "invalid resource claim names" errMissingClaimNames = "missing names" errFmtConflictingClaimName = "%q conflicts with composite resource name" errCustomResourceValidationNil = "custom resource validation cannot be nil" ) // ForCompositeResource derives the CustomResourceDefinition for a composite // resource from the supplied CompositeResourceDefinition. func ForCompositeResource(xrd *v1.CompositeResourceDefinition) (*extv1.CustomResourceDefinition, error) { crd := &extv1.CustomResourceDefinition{ Spec: extv1.CustomResourceDefinitionSpec{ Group: xrd.Spec.Group, Names: xrd.Spec.Names, Versions: make([]extv1.CustomResourceDefinitionVersion, len(xrd.Spec.Versions)), Conversion: xrd.Spec.Conversion, }, } crd.SetName(xrd.GetName()) setCrdMetadata(crd, xrd) crd.SetOwnerReferences([]metav1.OwnerReference{meta.AsController( meta.TypedReferenceTo(xrd, v1.CompositeResourceDefinitionGroupVersionKind), )}) scope := ptr.Deref(xrd.Spec.Scope, v1.CompositeResourceScopeLegacyCluster) switch scope { case v1.CompositeResourceScopeNamespaced: crd.Spec.Scope = extv1.NamespaceScoped case v1.CompositeResourceScopeCluster: crd.Spec.Scope = extv1.ClusterScoped case v1.CompositeResourceScopeLegacyCluster: crd.Spec.Scope = extv1.ClusterScoped } crd.Spec.Names.Categories = append(crd.Spec.Names.Categories, CategoryComposite) // The composite name is used as a label value, so we must ensure it is not // longer. const maxCompositeNameLength = 63 for i, vr := range xrd.Spec.Versions { crdv, err := genCrdVersion(vr, maxCompositeNameLength) if err != nil { return nil, errors.Wrapf(err, errFmtGenCrd, "Composite Resource", xrd.Name) } crdv.AdditionalPrinterColumns = append(crdv.AdditionalPrinterColumns, CompositeResourcePrinterColumns(scope)...) props := CompositeResourceSpecProps(scope, xrd.Spec.DefaultCompositionUpdatePolicy) for k, v := range props { crdv.Schema.OpenAPIV3Schema.Properties["spec"].Properties[k] = v } props = CompositeResourceStatusProps(scope) for k, v := range props { crdv.Schema.OpenAPIV3Schema.Properties["status"].Properties[k] = v } crd.Spec.Versions[i] = *crdv } return crd, nil } // ForCompositeResourceClaim derives the CustomResourceDefinition for a // composite resource claim from the supplied CompositeResourceDefinition. func ForCompositeResourceClaim(xrd *v1.CompositeResourceDefinition) (*extv1.CustomResourceDefinition, error) { if err := validateClaimNames(xrd); err != nil { return nil, errors.Wrap(err, errInvalidClaimNames) } crd := &extv1.CustomResourceDefinition{ Spec: extv1.CustomResourceDefinitionSpec{ Scope: extv1.NamespaceScoped, Group: xrd.Spec.Group, Names: *xrd.Spec.ClaimNames, Versions: make([]extv1.CustomResourceDefinitionVersion, len(xrd.Spec.Versions)), Conversion: xrd.Spec.Conversion, }, } crd.SetName(xrd.Spec.ClaimNames.Plural + "." + xrd.Spec.Group) setCrdMetadata(crd, xrd) crd.SetOwnerReferences([]metav1.OwnerReference{meta.AsController( meta.TypedReferenceTo(xrd, v1.CompositeResourceDefinitionGroupVersionKind), )}) crd.Spec.Names.Categories = append(crd.Spec.Names.Categories, CategoryClaim) // 63 because the names are used as label values. We don't put 63-6 // (generateName suffix length) here because the name generator shortens // the base to 57 automatically before appending the suffix. const maxClaimNameLength = 63 for i, vr := range xrd.Spec.Versions { crdv, err := genCrdVersion(vr, maxClaimNameLength) if err != nil { return nil, errors.Wrapf(err, errFmtGenCrd, "Composite Resource Claim", xrd.Name) } crdv.AdditionalPrinterColumns = append(crdv.AdditionalPrinterColumns, CompositeResourceClaimPrinterColumns()...) props := CompositeResourceClaimSpecProps(xrd.Spec.DefaultCompositeDeletePolicy) for k, v := range props { crdv.Schema.OpenAPIV3Schema.Properties["spec"].Properties[k] = v } // TODO(negz): This means claims will have status.claimConditionTypes. // I think that's a bug - only XRs should have that field. props = CompositeResourceStatusProps(v1.CompositeResourceScopeLegacyCluster) for k, v := range props { crdv.Schema.OpenAPIV3Schema.Properties["status"].Properties[k] = v } crd.Spec.Versions[i] = *crdv } return crd, nil } func genCrdVersion(vr v1.CompositeResourceDefinitionVersion, maxNameLength int64) (*extv1.CustomResourceDefinitionVersion, error) { crdv := extv1.CustomResourceDefinitionVersion{ Name: vr.Name, Served: vr.Served, Storage: vr.Referenceable, Deprecated: ptr.Deref(vr.Deprecated, false), DeprecationWarning: vr.DeprecationWarning, AdditionalPrinterColumns: vr.AdditionalPrinterColumns, Schema: &extv1.CustomResourceValidation{ OpenAPIV3Schema: BaseProps(), }, Subresources: &extv1.CustomResourceSubresources{ Status: &extv1.CustomResourceSubresourceStatus{}, }, } s, err := parseSchema(vr.Schema) if err != nil { return nil, errors.Wrapf(err, errParseValidation) } if s == nil { return nil, errors.New(errCustomResourceValidationNil) } crdv.Schema.OpenAPIV3Schema.Description = s.Description maxLength := maxNameLength if old := s.Properties["metadata"].Properties["name"].MaxLength; old != nil && *old < maxLength { maxLength = *old } xName := crdv.Schema.OpenAPIV3Schema.Properties["metadata"].Properties["name"] xName.MaxLength = ptr.To(maxLength) xName.Type = "string" xMetaData := crdv.Schema.OpenAPIV3Schema.Properties["metadata"] xMetaData.Properties = map[string]extv1.JSONSchemaProps{"name": xName} crdv.Schema.OpenAPIV3Schema.Properties["metadata"] = xMetaData xSpec := s.Properties["spec"] cSpec := crdv.Schema.OpenAPIV3Schema.Properties["spec"] cSpec.Required = append(cSpec.Required, xSpec.Required...) cSpec.XPreserveUnknownFields = xSpec.XPreserveUnknownFields cSpec.XValidations = append(cSpec.XValidations, xSpec.XValidations...) cSpec.OneOf = append(cSpec.OneOf, xSpec.OneOf...) cSpec.Description = xSpec.Description for k, v := range xSpec.Properties { cSpec.Properties[k] = v } crdv.Schema.OpenAPIV3Schema.Properties["spec"] = cSpec xStatus := s.Properties["status"] cStatus := crdv.Schema.OpenAPIV3Schema.Properties["status"] cStatus.Required = xStatus.Required cStatus.XValidations = xStatus.XValidations cStatus.Description = xStatus.Description cStatus.OneOf = xStatus.OneOf for k, v := range xStatus.Properties { cStatus.Properties[k] = v } crdv.Schema.OpenAPIV3Schema.Properties["status"] = cStatus return &crdv, nil } func validateClaimNames(d *v1.CompositeResourceDefinition) error { if d.Spec.ClaimNames == nil { return errors.New(errMissingClaimNames) } if n := d.Spec.ClaimNames.Kind; n == d.Spec.Names.Kind { return errors.Errorf(errFmtConflictingClaimName, n) } if n := d.Spec.ClaimNames.Plural; n == d.Spec.Names.Plural { return errors.Errorf(errFmtConflictingClaimName, n) } if n := d.Spec.ClaimNames.Singular; n != "" && n == d.Spec.Names.Singular { return errors.Errorf(errFmtConflictingClaimName, n) } if n := d.Spec.ClaimNames.ListKind; n != "" && n == d.Spec.Names.ListKind { return errors.Errorf(errFmtConflictingClaimName, n) } return nil } func parseSchema(v *v1.CompositeResourceValidation) (*extv1.JSONSchemaProps, error) { if v == nil { return nil, nil } s := &extv1.JSONSchemaProps{} if err := json.Unmarshal(v.OpenAPIV3Schema.Raw, s); err != nil { return nil, errors.Wrap(err, errParseValidation) } return s, nil } // setCrdMetadata sets the labels and annotations on the CRD. func setCrdMetadata(crd *extv1.CustomResourceDefinition, xrd *v1.CompositeResourceDefinition) *extv1.CustomResourceDefinition { crd.SetLabels(xrd.GetLabels()) if xrd.Spec.Metadata != nil { if xrd.Spec.Metadata.Labels != nil { inheritedLabels := crd.GetLabels() if inheritedLabels == nil { inheritedLabels = map[string]string{} } for k, v := range xrd.Spec.Metadata.Labels { inheritedLabels[k] = v } crd.SetLabels(inheritedLabels) } if xrd.Spec.Metadata.Annotations != nil { crd.SetAnnotations(xrd.Spec.Metadata.Annotations) } } return crd } // IsEstablished is a helper function to check whether api-server is ready // to accept the instances of registered CRD. func IsEstablished(s extv1.CustomResourceDefinitionStatus) bool { for _, c := range s.Conditions { if c.Type == extv1.Established { return c.Status == extv1.ConditionTrue } } return false }
/* Copyright 2023 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package xcrd import ( "testing" fuzz "github.com/AdaLogics/go-fuzz-headers" v1 "github.com/crossplane/crossplane/apis/apiextensions/v1" ) func FuzzForCompositeResourceXcrd(f *testing.F) { f.Fuzz(func(_ *testing.T, data []byte) { ff := fuzz.NewConsumer(data) xrd := &v1.CompositeResourceDefinition{} err := ff.GenerateStruct(xrd) if err != nil { return } _, _ = ForCompositeResource(xrd) }) } func FuzzForCompositeResourceClaim(f *testing.F) { f.Fuzz(func(_ *testing.T, data []byte) { ff := fuzz.NewConsumer(data) xrd := &v1.CompositeResourceDefinition{} err := ff.GenerateStruct(xrd) if err != nil { return } _, _ = ForCompositeResourceClaim(xrd) }) }
/* Copyright 2020 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package xcrd import ( "fmt" extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/utils/ptr" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" v1 "github.com/crossplane/crossplane/apis/apiextensions/v1" ) // Label keys. const ( LabelKeyNamePrefixForComposed = "crossplane.io/composite" LabelKeyClaimName = "crossplane.io/claim-name" LabelKeyClaimNamespace = "crossplane.io/claim-namespace" ) // CompositionRevisionRef should be propagated dynamically. const CompositionRevisionRef = "compositionRevisionRef" // PropagateSpecProps is the list of XRC spec properties to propagate // when translating an XRC into an XR. var PropagateSpecProps = []string{"compositionRef", "compositionSelector", "compositionUpdatePolicy", "compositionRevisionSelector"} //nolint:gochecknoglobals // We treat this as a constant. // TODO(negz): Add descriptions to schema fields. // BaseProps is a partial OpenAPIV3Schema for the spec fields that Crossplane // expects to be present for all CRDs that it creates. func BaseProps() *extv1.JSONSchemaProps { return &extv1.JSONSchemaProps{ Type: "object", Required: []string{"spec"}, Properties: map[string]extv1.JSONSchemaProps{ "apiVersion": { Type: "string", }, "kind": { Type: "string", }, "metadata": { // NOTE(muvaf): api-server takes care of validating // metadata. Type: "object", }, "spec": { Type: "object", Properties: map[string]extv1.JSONSchemaProps{}, }, "status": { Type: "object", Properties: map[string]extv1.JSONSchemaProps{}, }, }, } } // CompositeResourceSpecProps is a partial OpenAPIV3Schema for the spec fields // that Crossplane expects to be present for all defined composite resources. func CompositeResourceSpecProps(s v1.CompositeResourceScope, defaultPol *xpv1.UpdatePolicy) map[string]extv1.JSONSchemaProps { props := map[string]extv1.JSONSchemaProps{ "compositionRef": { Type: "object", Required: []string{"name"}, Properties: map[string]extv1.JSONSchemaProps{ "name": {Type: "string"}, }, }, "compositionSelector": { Type: "object", Required: []string{"matchLabels"}, Properties: map[string]extv1.JSONSchemaProps{ "matchLabels": { Type: "object", AdditionalProperties: &extv1.JSONSchemaPropsOrBool{ Allows: true, Schema: &extv1.JSONSchemaProps{Type: "string"}, }, }, }, }, "compositionRevisionRef": { Type: "object", Required: []string{"name"}, Properties: map[string]extv1.JSONSchemaProps{ "name": {Type: "string"}, }, }, "compositionRevisionSelector": { Type: "object", Required: []string{"matchLabels"}, Properties: map[string]extv1.JSONSchemaProps{ "matchLabels": { Type: "object", AdditionalProperties: &extv1.JSONSchemaPropsOrBool{ Allows: true, Schema: &extv1.JSONSchemaProps{Type: "string"}, }, }, }, }, "compositionUpdatePolicy": { Type: "string", Enum: []extv1.JSON{ {Raw: []byte(`"Automatic"`)}, {Raw: []byte(`"Manual"`)}, }, Default: func() *extv1.JSON { if defaultPol == nil { return nil } return &extv1.JSON{Raw: []byte(fmt.Sprintf("\"%s\"", *defaultPol))} }(), }, "resourceRefs": { Type: "array", Items: &extv1.JSONSchemaPropsOrArray{ Schema: &extv1.JSONSchemaProps{ Type: "object", Properties: map[string]extv1.JSONSchemaProps{ "apiVersion": {Type: "string"}, "name": {Type: "string"}, "namespace": {Type: "string"}, "kind": {Type: "string"}, }, Required: []string{"apiVersion", "kind"}, }, }, // Controllers should replace the entire resourceRefs array. XListType: ptr.To("atomic"), }, } // Namespaced XRs don't get to reference composed resources in other // namespaces. if s == v1.CompositeResourceScopeNamespaced { props["resourceRefs"] = extv1.JSONSchemaProps{ Type: "array", Items: &extv1.JSONSchemaPropsOrArray{ Schema: &extv1.JSONSchemaProps{ Type: "object", Properties: map[string]extv1.JSONSchemaProps{ "apiVersion": {Type: "string"}, "name": {Type: "string"}, "kind": {Type: "string"}, }, Required: []string{"apiVersion", "kind"}, }, }, // Controllers should replace the entire resourceRefs array. XListType: ptr.To("atomic"), } } // Legacy XRs have their Crossplane machinery fields directly under spec. // They also support referencing a claim, and writing a secret. if s == v1.CompositeResourceScopeLegacyCluster { props["claimRef"] = extv1.JSONSchemaProps{ Type: "object", Required: []string{"apiVersion", "kind", "namespace", "name"}, Properties: map[string]extv1.JSONSchemaProps{ "apiVersion": {Type: "string"}, "kind": {Type: "string"}, "namespace": {Type: "string"}, "name": {Type: "string"}, }, } props["writeConnectionSecretToRef"] = extv1.JSONSchemaProps{ Type: "object", Required: []string{"name", "namespace"}, Properties: map[string]extv1.JSONSchemaProps{ "name": {Type: "string"}, "namespace": {Type: "string"}, }, } return props } // Modern XRs nest their Crossplane machinery fields under spec.crossplane. return map[string]extv1.JSONSchemaProps{ "crossplane": { Type: "object", Description: "Configures how Crossplane will reconcile this composite resource", Properties: props, }, } } // CompositeResourceClaimSpecProps is a partial OpenAPIV3Schema for the spec // fields that Crossplane expects to be present for all published infrastructure // resources. func CompositeResourceClaimSpecProps(defaultPol *xpv1.CompositeDeletePolicy) map[string]extv1.JSONSchemaProps { return map[string]extv1.JSONSchemaProps{ "compositionRef": { Type: "object", Required: []string{"name"}, Properties: map[string]extv1.JSONSchemaProps{ "name": {Type: "string"}, }, }, "compositionSelector": { Type: "object", Required: []string{"matchLabels"}, Properties: map[string]extv1.JSONSchemaProps{ "matchLabels": { Type: "object", AdditionalProperties: &extv1.JSONSchemaPropsOrBool{ Allows: true, Schema: &extv1.JSONSchemaProps{Type: "string"}, }, }, }, }, "compositionRevisionRef": { Type: "object", Required: []string{"name"}, Properties: map[string]extv1.JSONSchemaProps{ "name": {Type: "string"}, }, }, "compositionRevisionSelector": { Type: "object", Required: []string{"matchLabels"}, Properties: map[string]extv1.JSONSchemaProps{ "matchLabels": { Type: "object", AdditionalProperties: &extv1.JSONSchemaPropsOrBool{ Allows: true, Schema: &extv1.JSONSchemaProps{Type: "string"}, }, }, }, }, "compositionUpdatePolicy": { Type: "string", Enum: []extv1.JSON{ {Raw: []byte(`"Automatic"`)}, {Raw: []byte(`"Manual"`)}, }, }, "compositeDeletePolicy": { Type: "string", Enum: []extv1.JSON{ {Raw: []byte(`"Background"`)}, {Raw: []byte(`"Foreground"`)}, }, Default: func() *extv1.JSON { if defaultPol == nil { return nil } return &extv1.JSON{Raw: []byte(fmt.Sprintf("\"%s\"", *defaultPol))} }(), }, "resourceRef": { Type: "object", Required: []string{"apiVersion", "kind", "name"}, Properties: map[string]extv1.JSONSchemaProps{ "apiVersion": {Type: "string"}, "kind": {Type: "string"}, "name": {Type: "string"}, }, }, "writeConnectionSecretToRef": { Type: "object", Required: []string{"name"}, Properties: map[string]extv1.JSONSchemaProps{ "name": {Type: "string"}, }, }, } } // CompositeResourceStatusProps is a partial OpenAPIV3Schema for the status // fields that Crossplane expects to be present for all composite resources. func CompositeResourceStatusProps(s v1.CompositeResourceScope) map[string]extv1.JSONSchemaProps { props := map[string]extv1.JSONSchemaProps{ "conditions": { Description: "Conditions of the resource.", Type: "array", XListMapKeys: []string{ "type", }, XListType: ptr.To("map"), Items: &extv1.JSONSchemaPropsOrArray{ Schema: &extv1.JSONSchemaProps{ Type: "object", Required: []string{"lastTransitionTime", "reason", "status", "type"}, Properties: map[string]extv1.JSONSchemaProps{ "lastTransitionTime": {Type: "string", Format: "date-time"}, "message": {Type: "string"}, "reason": {Type: "string"}, "status": {Type: "string"}, "type": {Type: "string"}, "observedGeneration": {Type: "integer", Format: "int64"}, }, }, }, }, } switch s { case v1.CompositeResourceScopeNamespaced, v1.CompositeResourceScopeCluster: // Modern XRs use status.crossplane, and don't support claims. props["crossplane"] = extv1.JSONSchemaProps{ Type: "object", Description: "Indicates how Crossplane is reconciling this composite resource", Properties: map[string]extv1.JSONSchemaProps{ "connectionDetails": { Type: "object", Properties: map[string]extv1.JSONSchemaProps{ "lastPublishedTime": {Type: "string", Format: "date-time"}, }, }, }, } case v1.CompositeResourceScopeLegacyCluster: // Legacy XRs don't use status.crossplane, and support claims. props["connectionDetails"] = extv1.JSONSchemaProps{ Type: "object", Properties: map[string]extv1.JSONSchemaProps{ "lastPublishedTime": {Type: "string", Format: "date-time"}, }, } props["claimConditionTypes"] = extv1.JSONSchemaProps{ Type: "array", XListType: ptr.To("set"), Items: &extv1.JSONSchemaPropsOrArray{ Schema: &extv1.JSONSchemaProps{ Type: "string", }, }, } } return props } // CompositeResourcePrinterColumns returns the set of default printer columns // that should exist in all generated composite resource CRDs. func CompositeResourcePrinterColumns(s v1.CompositeResourceScope) []extv1.CustomResourceColumnDefinition { cols := []extv1.CustomResourceColumnDefinition{ { Name: "SYNCED", Type: "string", JSONPath: ".status.conditions[?(@.type=='Synced')].status", }, { Name: "READY", Type: "string", JSONPath: ".status.conditions[?(@.type=='Ready')].status", }, { Name: "COMPOSITION", Type: "string", JSONPath: ".spec.crossplane.compositionRef.name", }, { Name: "COMPOSITIONREVISION", Type: "string", JSONPath: ".spec.crossplane.compositionRevisionRef.name", Priority: 1, }, { Name: "AGE", Type: "date", JSONPath: ".metadata.creationTimestamp", }, } if s == v1.CompositeResourceScopeLegacyCluster { for i := range cols { if cols[i].Name == "COMPOSITION" { cols[i].JSONPath = ".spec.compositionRef.name" } if cols[i].Name == "COMPOSITIONREVISION" { cols[i].JSONPath = ".spec.compositionRevisionRef.name" } } } return cols } // CompositeResourceClaimPrinterColumns returns the set of default printer // columns that should exist in all generated composite resource claim CRDs. func CompositeResourceClaimPrinterColumns() []extv1.CustomResourceColumnDefinition { return []extv1.CustomResourceColumnDefinition{ { Name: "SYNCED", Type: "string", JSONPath: ".status.conditions[?(@.type=='Synced')].status", }, { Name: "READY", Type: "string", JSONPath: ".status.conditions[?(@.type=='Ready')].status", }, { Name: "CONNECTION-SECRET", Type: "string", JSONPath: ".spec.writeConnectionSecretToRef.name", }, { Name: "AGE", Type: "date", JSONPath: ".metadata.creationTimestamp", }, } } // GetPropFields returns the fields from a map of schema properties. func GetPropFields(props map[string]extv1.JSONSchemaProps) []string { propFields := make([]string, len(props)) i := 0 for k := range props { propFields[i] = k i++ } return propFields }