/*
Copyright 2024 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package source
import (
"bytes"
"context"
"errors"
"fmt"
"net/url"
"time"
"github.com/ProtonMail/go-crypto/openpgp"
"github.com/go-git/go-git/v5/plumbing/transport"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/fluxcd/pkg/auth"
authutils "github.com/fluxcd/pkg/auth/utils"
"github.com/fluxcd/pkg/cache"
"github.com/fluxcd/pkg/git"
"github.com/fluxcd/pkg/git/github"
"github.com/fluxcd/pkg/git/gogit"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
imagev1 "github.com/fluxcd/image-automation-controller/api/v1beta2"
)
const (
signingSecretKey = "git.asc"
signingPassphraseKey = "passphrase"
)
// gitSrcCfg contains all the Git configurations related to a source derived
// from the given configurations and the environment.
type gitSrcCfg struct {
srcKey types.NamespacedName
url string
pushBranch string
switchBranch bool
timeout *metav1.Duration
checkoutRef *sourcev1.GitRepositoryRef
authOpts *git.AuthOptions
clientOpts []gogit.ClientOption
signingEntity *openpgp.Entity
}
func buildGitConfig(ctx context.Context, c client.Client, originKey, srcKey types.NamespacedName, gitSpec *imagev1.GitSpec, opts SourceOptions) (*gitSrcCfg, error) {
cfg := &gitSrcCfg{
srcKey: srcKey,
}
// Get the repo.
repo := &sourcev1.GitRepository{}
if err := c.Get(ctx, srcKey, repo); err != nil {
if client.IgnoreNotFound(err) == nil {
return nil, fmt.Errorf("referenced git repository does not exist: %w", err)
}
}
cfg.url = repo.Spec.URL
// Configure Git operation timeout from the GitRepository configuration.
if repo.Spec.Timeout != nil {
cfg.timeout = repo.Spec.Timeout
} else {
cfg.timeout = &metav1.Duration{Duration: time.Minute}
}
// Get the checkout ref for the source, prioritizing the image automation
// object gitSpec checkout reference and falling back to the GitRepository
// reference if not provided.
// var checkoutRef *sourcev1.GitRepositoryRef
if gitSpec.Checkout != nil {
cfg.checkoutRef = &gitSpec.Checkout.Reference
} else if repo.Spec.Reference != nil {
cfg.checkoutRef = repo.Spec.Reference
} // else remain as `nil` and git.DefaultBranch will be used.
// Configure push first as the client options below depend on the push
// configuration.
if err := configurePush(cfg, gitSpec, cfg.checkoutRef); err != nil {
return nil, err
}
proxyOpts, proxyURL, err := getProxyOpts(ctx, c, repo)
if err != nil {
return nil, err
}
cfg.authOpts, err = getAuthOpts(ctx, c, repo, opts, proxyURL)
if err != nil {
return nil, err
}
cfg.clientOpts = []gogit.ClientOption{gogit.WithDiskStorage()}
if cfg.authOpts.Transport == git.HTTP {
cfg.clientOpts = append(cfg.clientOpts, gogit.WithInsecureCredentialsOverHTTP())
}
if proxyOpts != nil {
cfg.clientOpts = append(cfg.clientOpts, gogit.WithProxy(*proxyOpts))
}
// If the push branch is different from the checkout ref, we need to
// have all the references downloaded at clone time, to ensure that
// SwitchBranch will have access to the target branch state. fluxcd/flux2#3384
//
// To always overwrite the push branch, the feature gate
// GitAllBranchReferences can be set to false, which will cause
// the SwitchBranch operation to ignore the remote branch state.
if cfg.switchBranch {
cfg.clientOpts = append(cfg.clientOpts, gogit.WithSingleBranch(!opts.gitAllBranchReferences))
}
if gitSpec.Commit.SigningKey != nil {
if cfg.signingEntity, err = getSigningEntity(ctx, c, originKey.Namespace, gitSpec); err != nil {
return nil, err
}
}
return cfg, nil
}
func configurePush(cfg *gitSrcCfg, gitSpec *imagev1.GitSpec, checkoutRef *sourcev1.GitRepositoryRef) error {
if gitSpec.Push != nil && gitSpec.Push.Branch != "" {
cfg.pushBranch = gitSpec.Push.Branch
if checkoutRef != nil {
if cfg.pushBranch != checkoutRef.Branch {
cfg.switchBranch = true
}
} else {
// Compare with the git default branch when no checkout ref is
// explicitly defined.
if cfg.pushBranch != git.DefaultBranch {
cfg.switchBranch = true
}
}
return nil
}
// If no push branch is configured above, use the branch from checkoutRef.
// Here's where it gets constrained. If there's no push branch
// given, then the checkout ref must include a branch, and
// that can be used.
if checkoutRef == nil || checkoutRef.Branch == "" {
return errors.New("push spec not provided, and cannot be inferred from .spec.git.checkout.ref or GitRepository .spec.ref")
}
cfg.pushBranch = checkoutRef.Branch
return nil
}
func getAuthOpts(ctx context.Context, c client.Client, repo *sourcev1.GitRepository,
srcOpts SourceOptions, proxyURL *url.URL) (*git.AuthOptions, error) {
var data map[string][]byte
var err error
if repo.Spec.SecretRef != nil {
data, err = getSecretData(ctx, c, repo.Spec.SecretRef.Name, repo.GetNamespace())
if err != nil {
return nil, fmt.Errorf("failed to get auth secret '%s/%s': %w", repo.GetNamespace(), repo.Spec.SecretRef.Name, err)
}
}
u, err := url.Parse(repo.Spec.URL)
if err != nil {
return nil, fmt.Errorf("failed to parse URL '%s': %w", repo.Spec.URL, err)
}
opts, err := git.NewAuthOptions(*u, data)
if err != nil {
return nil, fmt.Errorf("failed to configure authentication options: %w", err)
}
var getCreds func() (*authutils.GitCredentials, error)
switch provider := repo.GetProvider(); provider {
case sourcev1.GitProviderAzure: // If AWS or GCP are added in the future they can be added here separated by a comma.
getCreds = func() (*authutils.GitCredentials, error) {
var opts []auth.Option
if srcOpts.tokenCache != nil {
involvedObject := cache.InvolvedObject{
Kind: imagev1.ImageUpdateAutomationKind,
Name: srcOpts.objName,
Namespace: srcOpts.objNamespace,
Operation: cache.OperationReconcile,
}
opts = append(opts, auth.WithCache(*srcOpts.tokenCache, involvedObject))
}
if proxyURL != nil {
opts = append(opts, auth.WithProxyURL(*proxyURL))
}
return authutils.GetGitCredentials(ctx, provider, opts...)
}
case sourcev1.GitProviderGitHub:
// if provider is github, but secret ref is not specified
if repo.Spec.SecretRef == nil {
return nil, fmt.Errorf("secretRef with github app data must be specified when provider is set to github: %w", ErrInvalidSourceConfiguration)
}
getCreds = func() (*authutils.GitCredentials, error) {
var opts []github.OptFunc
if len(data) > 0 {
opts = append(opts, github.WithAppData(data))
}
if proxyURL != nil {
opts = append(opts, github.WithProxyURL(proxyURL))
}
if srcOpts.tokenCache != nil {
opts = append(opts, github.WithCache(srcOpts.tokenCache, imagev1.ImageUpdateAutomationKind,
srcOpts.objName, srcOpts.objNamespace, cache.OperationReconcile))
}
username, password, err := github.GetCredentials(ctx, opts...)
if err != nil {
return nil, err
}
return &authutils.GitCredentials{
Username: username,
Password: password,
}, nil
}
default:
// analyze secret, if it has github app data, perhaps provider should have been github.
if appID := data[github.AppIDKey]; len(appID) != 0 {
return nil, fmt.Errorf("secretRef '%s/%s' has github app data but provider is not set to github: %w", repo.GetNamespace(), repo.Spec.SecretRef.Name, ErrInvalidSourceConfiguration)
}
}
if getCreds != nil {
creds, err := getCreds()
if err != nil {
return nil, fmt.Errorf("failed to configure authentication options: %w", err)
}
opts.BearerToken = creds.BearerToken
opts.Username = creds.Username
opts.Password = creds.Password
}
return opts, nil
}
func getProxyOpts(ctx context.Context, c client.Client, repo *sourcev1.GitRepository) (*transport.ProxyOptions, *url.URL, error) {
if repo.Spec.ProxySecretRef == nil {
return nil, nil, nil
}
name := repo.Spec.ProxySecretRef.Name
namespace := repo.GetNamespace()
proxyData, err := getSecretData(ctx, c, name, namespace)
if err != nil {
return nil, nil, fmt.Errorf("failed to get proxy secret '%s/%s': %w", namespace, name, err)
}
b, ok := proxyData["address"]
if !ok {
return nil, nil, fmt.Errorf("invalid proxy secret '%s/%s': key 'address' is missing", namespace, name)
}
address := string(b)
username := string(proxyData["username"])
password := string(proxyData["password"])
proxyOpts := &transport.ProxyOptions{
URL: address,
Username: username,
Password: password,
}
proxyURL, err := url.Parse(string(address))
if err != nil {
return nil, nil, fmt.Errorf("invalid address in proxy secret '%s/%s': %w", namespace, name, err)
}
switch {
case username != "" && password == "":
proxyURL.User = url.User(username)
case username != "" && password != "":
proxyURL.User = url.UserPassword(username, password)
}
return proxyOpts, proxyURL, nil
}
func getSigningEntity(ctx context.Context, c client.Client, namespace string, gitSpec *imagev1.GitSpec) (*openpgp.Entity, error) {
secretName := gitSpec.Commit.SigningKey.SecretRef.Name
secretData, err := getSecretData(ctx, c, secretName, namespace)
if err != nil {
return nil, fmt.Errorf("could not find signing key secret '%s': %w", secretName, err)
}
data, ok := secretData[signingSecretKey]
if !ok {
return nil, fmt.Errorf("signing key secret '%s' does not contain a 'git.asc' key", secretName)
}
// Read entity from secret value
entities, err := openpgp.ReadArmoredKeyRing(bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("could not read signing key from secret '%s': %w", secretName, err)
}
if len(entities) > 1 {
return nil, fmt.Errorf("multiple entities read from secret '%s', could not determine which signing key to use", secretName)
}
entity := entities[0]
if entity.PrivateKey != nil && entity.PrivateKey.Encrypted {
passphrase, ok := secretData[signingPassphraseKey]
if !ok {
return nil, fmt.Errorf("can not use passphrase protected signing key without '%s' field present in secret %s",
"passphrase", secretName)
}
if err = entity.PrivateKey.Decrypt([]byte(passphrase)); err != nil {
return nil, fmt.Errorf("could not decrypt private key of the signing key present in secret %s: %w", secretName, err)
}
}
return entity, nil
}
func getSecretData(ctx context.Context, c client.Client, name, namespace string) (map[string][]byte, error) {
key := types.NamespacedName{
Namespace: namespace,
Name: name,
}
var secret corev1.Secret
if err := c.Get(ctx, key, &secret); err != nil {
return nil, err
}
return secret.Data, nil
}
/*
Copyright 2024 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package source
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"text/template"
"time"
"github.com/Masterminds/sprig/v3"
"github.com/fluxcd/pkg/cache"
"github.com/fluxcd/pkg/git"
"github.com/fluxcd/pkg/git/gogit"
"github.com/fluxcd/pkg/git/repository"
"github.com/fluxcd/pkg/runtime/acl"
"github.com/go-git/go-git/v5/plumbing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"github.com/fluxcd/pkg/runtime/logger"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
imagev1 "github.com/fluxcd/image-automation-controller/api/v1beta2"
"github.com/fluxcd/image-automation-controller/pkg/update"
)
// ErrInvalidSourceConfiguration is an error for invalid source configuration.
var ErrInvalidSourceConfiguration = errors.New("invalid source configuration")
const defaultMessageTemplate = `Update from image update automation`
// TemplateData is the type of the value given to the commit message
// template.
type TemplateData struct {
AutomationObject types.NamespacedName
Updated update.Result
Changed update.ResultV2
Values map[string]string
}
// SourceManager manages source.
type SourceManager struct {
srcCfg *gitSrcCfg
automationObjKey types.NamespacedName
gitClient *gogit.Client
workingDir string
}
// SourceOptions contains the optional attributes of SourceManager.
type SourceOptions struct {
noCrossNamespaceRef bool
gitAllBranchReferences bool
tokenCache *cache.TokenCache
objName string
objNamespace string
}
// SourceOption configures the SourceManager options.
type SourceOption func(*SourceOptions)
// WithSourceOptionNoCrossNamespaceRef configures the SourceManager to disable
// cross namespace references.
func WithSourceOptionNoCrossNamespaceRef() SourceOption {
return func(so *SourceOptions) {
so.noCrossNamespaceRef = true
}
}
// WithSourceOptionGitAllBranchReferences configures the SourceManager to fetch
// all the Git branch references that are present in the remote repository.
func WithSourceOptionGitAllBranchReferences() SourceOption {
return func(so *SourceOptions) {
so.gitAllBranchReferences = true
}
}
// WithSourceOptionTokenCache configures the SourceManager to use the provided
// token cache.
func WithSourceOptionTokenCache(tc *cache.TokenCache) SourceOption {
return func(so *SourceOptions) {
so.tokenCache = tc
}
}
// WithSourceOptionInvolvedObject configures the SourceManager to use the
// provided ImageUpdateAutomation object.
func WithSourceOptionInvolvedObject(name, namespace string) SourceOption {
return func(so *SourceOptions) {
so.objName = name
so.objNamespace = namespace
}
}
// NewSourceManager takes all the provided inputs, validates them and returns a
// SourceManager which can be used to operate on the configured source.
func NewSourceManager(ctx context.Context, c client.Client, obj *imagev1.ImageUpdateAutomation, options ...SourceOption) (*SourceManager, error) {
opts := &SourceOptions{}
for _, o := range options {
o(opts)
}
// Only GitRepository source is supported.
if obj.Spec.SourceRef.Kind != sourcev1.GitRepositoryKind {
return nil, fmt.Errorf("source kind '%s' not supported: %w", obj.Spec.SourceRef.Kind, ErrInvalidSourceConfiguration)
}
if obj.Spec.GitSpec == nil {
return nil, fmt.Errorf("source kind '%s' necessitates field .spec.git: %w", sourcev1.GitRepositoryKind, ErrInvalidSourceConfiguration)
}
// Build source reference configuration to fetch and validate it.
srcNamespace := obj.GetNamespace()
if obj.Spec.SourceRef.Namespace != "" {
srcNamespace = obj.Spec.SourceRef.Namespace
}
// srcKey is the GitRepository object key.
srcKey := types.NamespacedName{Name: obj.Spec.SourceRef.Name, Namespace: srcNamespace}
// originKey is the update automation object key.
originKey := client.ObjectKeyFromObject(obj)
// Check if the source is accessible.
if opts.noCrossNamespaceRef && srcKey.Namespace != obj.GetNamespace() {
return nil, acl.AccessDeniedError(fmt.Sprintf("can't access '%s/%s', cross-namespace references have been blocked", sourcev1.GitRepositoryKind, srcKey))
}
gitSrcCfg, err := buildGitConfig(ctx, c, originKey, srcKey, obj.Spec.GitSpec, *opts)
if err != nil {
return nil, err
}
workDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", gitSrcCfg.srcKey.Namespace, gitSrcCfg.srcKey.Name))
if err != nil {
return nil, err
}
sm := &SourceManager{
srcCfg: gitSrcCfg,
automationObjKey: originKey,
workingDir: workDir,
}
return sm, nil
}
// CreateWorkingDirectory creates a working directory for the SourceManager.
func (sm SourceManager) WorkDirectory() string {
return sm.workingDir
}
// Cleanup deletes the working directory of the SourceManager.
func (sm SourceManager) Cleanup() error {
return os.RemoveAll(sm.workingDir)
}
// SwitchBranch returns if the checkout branch and push branch are different.
func (sm SourceManager) SwitchBranch() bool {
return sm.srcCfg.switchBranch
}
// CheckoutOption allows configuring the checkout options.
type CheckoutOption func(*repository.CloneConfig)
// WithCheckoutOptionLastObserved is a CheckoutOption option to configure the
// last observed commit.
func WithCheckoutOptionLastObserved(commit string) CheckoutOption {
return func(cc *repository.CloneConfig) {
cc.LastObservedCommit = commit
}
}
// WithCheckoutOptionShallowClone is a CheckoutOption option to configure
// shallow clone.
func WithCheckoutOptionShallowClone() CheckoutOption {
return func(cc *repository.CloneConfig) {
cc.ShallowClone = true
}
}
// WithCheckoutOptionSparseCheckoutDirectories is a CheckoutOption option to configure
// SparseCheckoutDirectories.
func WithCheckoutOptionSparseCheckoutDirectories(updatePath string) CheckoutOption {
return func(cc *repository.CloneConfig) {
cleanedPath := filepath.Clean(updatePath)
if cleanedPath == "." {
// Do not set SparseCheckoutDirectories if repository root is specified
return
}
cc.SparseCheckoutDirectories = []string{cleanedPath}
}
}
// CheckoutSource clones and checks out the source. If a push branch is
// configured that doesn't match with the checkout branch, a checkout to the
// push branch is also performed. This ensures any change and push operation
// following the checkout happens on the push branch.
func (sm *SourceManager) CheckoutSource(ctx context.Context, options ...CheckoutOption) (*git.Commit, error) {
// Configuration clone options.
cloneCfg := repository.CloneConfig{}
if sm.srcCfg.checkoutRef != nil {
cloneCfg.Tag = sm.srcCfg.checkoutRef.Tag
cloneCfg.SemVer = sm.srcCfg.checkoutRef.SemVer
cloneCfg.Commit = sm.srcCfg.checkoutRef.Commit
cloneCfg.Branch = sm.srcCfg.checkoutRef.Branch
}
// Apply checkout configurations.
for _, o := range options {
o(&cloneCfg)
}
var err error
sm.gitClient, err = gogit.NewClient(sm.workingDir, sm.srcCfg.authOpts, sm.srcCfg.clientOpts...)
if err != nil {
return nil, err
}
gitOpCtx, cancel := context.WithTimeout(ctx, sm.srcCfg.timeout.Duration)
defer cancel()
commit, err := sm.gitClient.Clone(gitOpCtx, sm.srcCfg.url, cloneCfg)
if err != nil {
return nil, err
}
if sm.srcCfg.switchBranch {
if err := sm.gitClient.SwitchBranch(gitOpCtx, sm.srcCfg.pushBranch); err != nil {
return nil, err
}
}
return commit, nil
}
// PushConfig configures the options used in push operation.
type PushConfig func(*repository.PushConfig)
// WithPushConfigForce configures the PushConfig to use force.
func WithPushConfigForce() PushConfig {
return func(pc *repository.PushConfig) {
pc.Force = true
}
}
// WithPushConfigOptions configures the PushConfig Options that are used in
// push.
func WithPushConfigOptions(opts map[string]string) PushConfig {
return func(pc *repository.PushConfig) {
pc.Options = opts
}
}
// CommitAndPush performs a commit in the source and pushes it to the remote
// repository.
func (sm SourceManager) CommitAndPush(ctx context.Context, obj *imagev1.ImageUpdateAutomation, policyResult update.ResultV2, pushOptions ...PushConfig) (*PushResult, error) {
tracelog := log.FromContext(ctx).V(logger.TraceLevel)
// Make sure there were file changes that need to be committed.
if len(policyResult.FileChanges) == 0 {
return nil, nil
}
// Perform a Git commit.
templateValues := &TemplateData{
AutomationObject: sm.automationObjKey,
Updated: policyResult.ImageResult,
Changed: policyResult,
Values: obj.Spec.GitSpec.Commit.MessageTemplateValues,
}
commitMsg, err := templateMsg(obj.Spec.GitSpec.Commit.MessageTemplate, templateValues)
if err != nil {
return nil, err
}
signature := git.Signature{
Name: obj.Spec.GitSpec.Commit.Author.Name,
Email: obj.Spec.GitSpec.Commit.Author.Email,
When: time.Now(),
}
var rev string
var commitErr error
rev, commitErr = sm.gitClient.Commit(
git.Commit{
Author: signature,
Message: commitMsg,
},
repository.WithSigner(sm.srcCfg.signingEntity),
)
if commitErr != nil {
if !errors.Is(commitErr, git.ErrNoStagedFiles) {
return nil, commitErr
}
log.FromContext(ctx).Info("no changes made in the source; no commit")
return nil, nil
}
// Push the commit to push branch.
gitOpCtx, cancel := context.WithTimeout(ctx, sm.srcCfg.timeout.Duration)
defer cancel()
pushConfig := repository.PushConfig{}
for _, po := range pushOptions {
po(&pushConfig)
}
if err := sm.gitClient.Push(gitOpCtx, pushConfig); err != nil {
return nil, err
}
tracelog.Info("pushed commit to push branch", "revision", rev, "branch", sm.srcCfg.pushBranch)
// Push to any provided refspec.
if obj.Spec.GitSpec.HasRefspec() {
pushConfig.Refspecs = append(pushConfig.Refspecs, obj.Spec.GitSpec.Push.Refspec)
if err := sm.gitClient.Push(gitOpCtx, pushConfig); err != nil {
return nil, err
}
tracelog.Info("pushed commit to refspec", "revision", rev, "refspecs", pushConfig.Refspecs)
}
// Construct the result of the push operation and return.
prOpts := []PushResultOption{WithPushResultRefspec(pushConfig.Refspecs)}
if sm.srcCfg.switchBranch {
prOpts = append(prOpts, WithPushResultSwitchBranch())
}
return NewPushResult(sm.srcCfg.pushBranch, rev, commitMsg, prOpts...)
}
// templateMsg renders a msg template, returning the message or an error.
func templateMsg(messageTemplate string, templateValues *TemplateData) (string, error) {
if messageTemplate == "" {
messageTemplate = defaultMessageTemplate
}
// Includes only functions that are guaranteed to always evaluate to the same result for given input.
// This removes the possibility of accidentally relying on where or when the template runs.
// https://github.com/Masterminds/sprig/blob/3ac42c7bc5e4be6aa534e036fb19dde4a996da2e/functions.go#L70
t, err := template.New("commit message").Funcs(sprig.HermeticTxtFuncMap()).Parse(messageTemplate)
if err != nil {
return "", fmt.Errorf("unable to create commit message template from spec: %w", err)
}
b := &strings.Builder{}
if err := t.Execute(b, *templateValues); err != nil {
return "", fmt.Errorf("failed to run template from spec: %w", err)
}
return b.String(), nil
}
// PushResultOption allows configuring the options of PushResult.
type PushResultOption func(*PushResult)
// WithPushResultSwitchBranch marks the PushResult with switchBranch.
func WithPushResultSwitchBranch() func(*PushResult) {
return func(pr *PushResult) {
pr.switchBranch = true
}
}
// WithPushResultRefspec sets the refspecs in the PushResult.
func WithPushResultRefspec(refspecs []string) func(*PushResult) {
return func(pr *PushResult) {
pr.refspecs = append(pr.refspecs, refspecs...)
}
}
// PushResult is the result of a push operation.
type PushResult struct {
commit *git.Commit
switchBranch bool
branch string
refspecs []string
creationTime *metav1.Time
}
// NewPushResult returns a new PushResult.
func NewPushResult(branch string, rev string, commitMsg string, opts ...PushResultOption) (*PushResult, error) {
if rev == "" {
return nil, errors.New("empty push commit revision")
}
pr := &PushResult{}
for _, o := range opts {
o(pr)
}
pr.commit = &git.Commit{
Hash: git.ExtractHashFromRevision(rev),
Reference: plumbing.NewBranchReferenceName(branch).String(),
Message: commitMsg,
}
pr.branch = branch
pr.creationTime = &metav1.Time{Time: time.Now()}
return pr, nil
}
// Commit returns the revision of the pushed commit.
func (pr PushResult) Commit() *git.Commit {
return pr.commit
}
// Time returns the time at which the push was performed.
func (pr PushResult) Time() *metav1.Time {
return pr.creationTime
}
// SwitchBranch returns if the source has different checkout and push branch.
func (pr PushResult) SwitchBranch() bool {
return pr.switchBranch
}
// Summary returns a summary of the PushResult.
func (pr PushResult) Summary() string {
var summary strings.Builder
shortCommitHash := pr.Commit().Hash.String()
if len(shortCommitHash) > 7 {
shortCommitHash = shortCommitHash[:7]
}
summary.WriteString(fmt.Sprintf("pushed commit '%s' to branch '%s'", shortCommitHash, pr.branch))
if len(pr.refspecs) > 0 {
summary.WriteString(fmt.Sprintf(" and refspecs '%s'", strings.Join(pr.refspecs, "', '")))
}
if pr.Commit().Message != "" {
summary.WriteString(fmt.Sprintf("\n%s", pr.Commit().Message))
}
return summary.String()
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"encoding/json"
"fmt"
"net/url"
"time"
"github.com/hashicorp/go-retryablehttp"
"golang.org/x/text/cases"
"golang.org/x/text/language"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
type Alertmanager struct {
URL string
ProxyURL string
CertPool *x509.CertPool
Token string
}
type AlertManagerAlert struct {
Status string `json:"status"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
StartsAt AlertManagerTime `json:"startsAt"`
EndsAt AlertManagerTime `json:"endsAt,omitempty"`
}
// AlertManagerTime takes care of representing time.Time as RFC3339.
// See https://prometheus.io/docs/alerting/0.27/clients/
type AlertManagerTime time.Time
func (a AlertManagerTime) String() string {
return time.Time(a).Format(time.RFC3339)
}
func (a AlertManagerTime) MarshalJSON() ([]byte, error) {
return json.Marshal(a.String())
}
func (a *AlertManagerTime) UnmarshalJSON(jsonRepr []byte) error {
var serializedTime string
if err := json.Unmarshal(jsonRepr, &serializedTime); err != nil {
return err
}
t, err := time.Parse(time.RFC3339, serializedTime)
if err != nil {
return err
}
*a = AlertManagerTime(t)
return nil
}
func NewAlertmanager(hookURL string, proxyURL string, certPool *x509.CertPool, token string) (*Alertmanager, error) {
_, err := url.ParseRequestURI(hookURL)
if err != nil {
return nil, fmt.Errorf("invalid Alertmanager URL %s: '%w'", hookURL, err)
}
return &Alertmanager{
URL: hookURL,
ProxyURL: proxyURL,
CertPool: certPool,
Token: token,
}, nil
}
func (s *Alertmanager) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
annotations := make(map[string]string)
annotations["message"] = event.Message
_, ok := event.Metadata["summary"]
if ok {
annotations["summary"] = event.Metadata["summary"]
delete(event.Metadata, "summary")
}
var labels = make(map[string]string)
if event.Metadata != nil {
labels = event.Metadata
}
labels["alertname"] = "Flux" + event.InvolvedObject.Kind + cases.Title(language.Und).String(event.Reason)
labels["severity"] = event.Severity
labels["reason"] = event.Reason
labels["kind"] = event.InvolvedObject.Kind
labels["name"] = event.InvolvedObject.Name
labels["namespace"] = event.InvolvedObject.Namespace
labels["reportingcontroller"] = event.ReportingController
// The best reasonable `endsAt` value would be multiplying
// InvolvedObject's reconciliation interval by 2 then adding that to
// `startsAt` (the next successful reconciliation would make sure
// the alert is cleared after the timeout). Due to
// event.InvolvedObject only containing the object reference (namely
// the GVKNN) best we can do is leave it unset up to Alertmanager's
// default `resolve_timeout`.
//
// https://prometheus.io/docs/alerting/0.27/configuration/#file-layout-and-global-settings
startsAt := AlertManagerTime(event.Timestamp.Time)
payload := []AlertManagerAlert{
{
Labels: labels,
Annotations: annotations,
Status: "firing",
StartsAt: startsAt,
},
}
var opts []postOption
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if s.Token != "" {
opts = append(opts, withRequestModifier(func(request *retryablehttp.Request) {
request.Header.Add("Authorization", "Bearer "+s.Token)
}))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"strings"
"github.com/microsoft/azure-devops-go-api/azuredevops/v6"
"github.com/microsoft/azure-devops-go-api/azuredevops/v6/git"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
)
const genre string = "fluxcd"
type azureDevOpsClient interface {
CreateCommitStatus(context.Context, git.CreateCommitStatusArgs) (*git.GitStatus, error)
GetStatuses(context.Context, git.GetStatusesArgs) (*[]git.GitStatus, error)
}
// AzureDevOps is an Azure DevOps notifier.
type AzureDevOps struct {
Project string
Repo string
CommitStatus string
Client azureDevOpsClient
}
// NewAzureDevOps creates and returns a new AzureDevOps notifier.
func NewAzureDevOps(commitStatus string, addr string, token string, certPool *x509.CertPool) (*AzureDevOps, error) {
if len(token) == 0 {
return nil, errors.New("azure devops token cannot be empty")
}
host, id, err := parseGitAddress(addr)
if err != nil {
return nil, err
}
// this should never happen
if commitStatus == "" {
return nil, errors.New("commit status cannot be empty")
}
comp := strings.Split(id, "/")
if len(comp) != 4 {
return nil, fmt.Errorf("invalid repository id %q", id)
}
org := comp[0]
proj := comp[1]
repo := comp[3]
orgURL := fmt.Sprintf("%v/%v", host, org)
connection := azuredevops.NewPatConnection(orgURL, token)
if certPool != nil {
connection.TlsConfig = &tls.Config{
RootCAs: certPool,
}
}
client := connection.GetClientByUrl(orgURL)
gitClient := &git.ClientImpl{
Client: *client,
}
return &AzureDevOps{
Project: proj,
Repo: repo,
CommitStatus: commitStatus,
Client: gitClient,
}, nil
}
// Post Azure DevOps commit status
func (a AzureDevOps) Post(ctx context.Context, event eventv1.Event) error {
// Skip progressing events
if event.HasReason(meta.ProgressingReason) {
return nil
}
revString, ok := event.GetRevision()
if !ok {
return errors.New("missing revision metadata")
}
rev, err := parseRevision(revString)
if err != nil {
return err
}
state, err := toAzureDevOpsState(event.Severity)
if err != nil {
return err
}
// Check if the exact status is already set
g := commitStatusGenre(event)
_, desc := formatNameAndDescription(event)
id := a.CommitStatus
createArgs := git.CreateCommitStatusArgs{
Project: &a.Project,
RepositoryId: &a.Repo,
CommitId: &rev,
GitCommitStatusToCreate: &git.GitStatus{
Description: &desc,
State: &state,
Context: &git.GitStatusContext{
Genre: &g,
Name: &id,
},
},
}
getArgs := git.GetStatusesArgs{
Project: &a.Project,
RepositoryId: &a.Repo,
CommitId: &rev,
}
statuses, err := a.Client.GetStatuses(ctx, getArgs)
if err != nil {
return fmt.Errorf("could not list commit statuses: %w", err)
}
if duplicateAzureDevOpsStatus(statuses, createArgs.GitCommitStatusToCreate) {
return nil
}
// Create a new status
_, err = a.Client.CreateCommitStatus(ctx, createArgs)
if err != nil {
return fmt.Errorf("could not create commit status: %w", err)
}
return nil
}
func toAzureDevOpsState(severity string) (git.GitStatusState, error) {
switch severity {
case eventv1.EventSeverityInfo:
return git.GitStatusStateValues.Succeeded, nil
case eventv1.EventSeverityError:
return git.GitStatusStateValues.Error, nil
default:
return "", errors.New("can't convert to azure devops state")
}
}
// duplicateStatus return true if the latest status
// with a matching context has the same state and description
func duplicateAzureDevOpsStatus(statuses *[]git.GitStatus, status *git.GitStatus) bool {
if status == nil || status.Context == nil || statuses == nil {
return false
}
for _, s := range *statuses {
if s.Context == nil || s.Context.Name == nil || s.Context.Genre == nil ||
s.State == nil || s.Description == nil {
continue
}
if *s.Context.Name == *status.Context.Name && *s.Context.Genre == *status.Context.Genre {
if *s.State == *status.State && *s.Description == *status.Description {
return true
}
return false
}
}
return false
}
func commitStatusGenre(event eventv1.Event) string {
summary, ok := event.Metadata["summary"]
if ok {
return fmt.Sprintf("%s:%s", genre, summary)
}
return genre
}
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"encoding/json"
"fmt"
"net/url"
"strings"
"github.com/Azure/azure-amqp-common-go/v4/auth"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
pkgauth "github.com/fluxcd/pkg/auth"
"github.com/fluxcd/pkg/auth/azure"
"github.com/fluxcd/pkg/cache"
pkgcache "github.com/fluxcd/pkg/cache"
"github.com/fluxcd/notification-controller/api/v1beta3"
)
// AzureEventHub holds the eventhub client
type AzureEventHub struct {
Hub *eventhub.Hub
}
// NewAzureEventHub creates a eventhub client
func NewAzureEventHub(ctx context.Context, endpointURL, token, eventHubNamespace, proxy, serviceAccountName, providerName, providerNamespace string, tokenClient client.Client, tokenCache *pkgcache.TokenCache) (*AzureEventHub, error) {
var hub *eventhub.Hub
var err error
if err := validateAuthOptions(endpointURL, token, serviceAccountName); err != nil {
return nil, fmt.Errorf("invalid authentication options: %v", err)
}
if isSASAuth(endpointURL) {
hub, err = newSASHub(endpointURL)
if err != nil {
return nil, fmt.Errorf("failed to create a eventhub using SAS %v", err)
}
} else {
// if token doesn't exist, try to create a new token using managed identity
if token == "" {
token, err = newManagedIdentityToken(ctx, proxy, serviceAccountName, providerName, providerNamespace, tokenClient, tokenCache)
if err != nil {
return nil, fmt.Errorf("failed to create a eventhub using managed identity %v", err)
}
}
hub, err = newJWTHub(endpointURL, token, eventHubNamespace)
if err != nil {
return nil, fmt.Errorf("failed to create a eventhub using authentication token %v", err)
}
}
return &AzureEventHub{
Hub: hub,
}, nil
}
// Post all notification-controller messages to EventHub
func (e *AzureEventHub) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
eventBytes, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("unable to marshall event: %w", err)
}
err = e.Hub.Send(ctx, eventhub.NewEvent(eventBytes))
if err != nil {
return fmt.Errorf("failed to send msg: %w", err)
}
err = e.Hub.Close(ctx)
if err != nil {
return fmt.Errorf("unable to close connection: %w", err)
}
return nil
}
// PureJWT just contains the jwt
type PureJWT struct {
jwt string
}
// NewJWTProvider create a pureJWT method
func NewJWTProvider(jwt string) *PureJWT {
return &PureJWT{
jwt: jwt,
}
}
// GetToken uses a JWT token, we assume that we will get new tokens when needed, thus no Expiry defined
func (j *PureJWT) GetToken(uri string) (*auth.Token, error) {
return &auth.Token{
TokenType: auth.CBSTokenTypeJWT,
Token: j.jwt,
Expiry: "",
}, nil
}
// newJWTHub used when address is a JWT token
func newJWTHub(eventhubName, token, eventHubNamespace string) (*eventhub.Hub, error) {
provider := NewJWTProvider(token)
hub, err := eventhub.NewHub(eventHubNamespace, eventhubName, provider)
if err != nil {
return nil, err
}
return hub, nil
}
// newSASHub used when address is a SAS ConnectionString
func newSASHub(address string) (*eventhub.Hub, error) {
hub, err := eventhub.NewHubFromConnectionString(address)
if err != nil {
return nil, err
}
return hub, nil
}
// newManagedIdentityToken is used to attempt credential-free authentication.
func newManagedIdentityToken(ctx context.Context, proxy, serviceAccountName, providerName, providerNamespace string, tokenClient client.Client, tokenCache *pkgcache.TokenCache) (string, error) {
opts := []pkgauth.Option{pkgauth.WithScopes(azure.ScopeEventHubs)}
if proxy != "" {
proxyURL, err := url.Parse(proxy)
if err != nil {
return "", fmt.Errorf("error parsing proxy URL : %w", err)
}
opts = append(opts, pkgauth.WithProxyURL(*proxyURL))
}
if serviceAccountName != "" {
serviceAccount := types.NamespacedName{
Name: serviceAccountName,
Namespace: providerNamespace,
}
opts = append(opts, pkgauth.WithServiceAccount(serviceAccount, tokenClient))
}
if tokenCache != nil {
involvedObject := cache.InvolvedObject{
Kind: v1beta3.ProviderKind,
Name: providerName,
Namespace: providerNamespace,
Operation: OperationPost,
}
opts = append(opts, pkgauth.WithCache(*tokenCache, involvedObject))
}
token, err := pkgauth.GetToken(ctx, azure.Provider{}, opts...)
if err != nil {
return "", fmt.Errorf("failed to get token for azure event hub: %w", err)
}
return token.(*azure.Token).AccessToken.Token, nil
}
// validateAuthOptions checks if the authentication options are valid
func validateAuthOptions(endpointURL, token, serviceAccountName string) error {
if isSASAuth(endpointURL) {
if err := validateSASAuth(token, serviceAccountName); err != nil {
return err
}
} else if serviceAccountName != "" && token != "" {
return fmt.Errorf("serviceAccountName and jwt token authentication cannot be set at the same time")
}
return nil
}
// isSASAuth checks if the endpoint URL contains SAS authentication parameters
func isSASAuth(endpointURL string) bool {
return strings.Contains(endpointURL, "SharedAccessKey")
}
// validateSASAuth checks if SAS authentication is used correctly
func validateSASAuth(token, serviceAccountName string) error {
if serviceAccountName != "" {
return fmt.Errorf("serviceAccountName and SAS authentication cannot be set at the same time")
}
if token != "" {
return fmt.Errorf("jwt token and SAS authentication cannot be set at the same time")
}
return nil
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"github.com/ktrysmt/go-bitbucket"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
)
// Bitbucket is a Bitbucket Server notifier.
type Bitbucket struct {
Owner string
Repo string
CommitStatus string
Client *bitbucket.Client
}
// NewBitbucket creates and returns a new Bitbucket notifier.
func NewBitbucket(commitStatus string, addr string, token string, certPool *x509.CertPool) (*Bitbucket, error) {
if len(token) == 0 {
return nil, errors.New("bitbucket token cannot be empty")
}
// this should never happen
if commitStatus == "" {
return nil, errors.New("commit status cannot be empty")
}
_, id, err := parseGitAddress(addr)
if err != nil {
return nil, err
}
comp := strings.Split(token, ":")
if len(comp) != 2 {
return nil, errors.New("invalid token format, expected to be <user>:<password>")
}
username := comp[0]
password := comp[1]
comp = strings.Split(id, "/")
if len(comp) != 2 {
return nil, fmt.Errorf("invalid repository id %q", id)
}
owner := comp[0]
repo := comp[1]
client := bitbucket.NewBasicAuth(username, password)
if certPool != nil {
tr := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
},
}
hc := &http.Client{Transport: tr}
client.HttpClient = hc
}
return &Bitbucket{
Owner: owner,
Repo: repo,
CommitStatus: commitStatus,
Client: client,
}, nil
}
// Post Bitbucket commit status
func (b Bitbucket) Post(ctx context.Context, event eventv1.Event) error {
// Skip progressing events
if event.HasReason(meta.ProgressingReason) {
return nil
}
revString, ok := event.GetRevision()
if !ok {
return errors.New("missing revision metadata")
}
rev, err := parseRevision(revString)
if err != nil {
return err
}
state, err := toBitbucketState(event.Severity)
if err != nil {
return err
}
name, desc := formatNameAndDescription(event)
id := b.CommitStatus
// key has a limitation of 40 characters in bitbucket api
key := sha1String(id)
cmo := &bitbucket.CommitsOptions{
Owner: b.Owner,
RepoSlug: b.Repo,
Revision: rev,
}
cso := &bitbucket.CommitStatusOptions{
State: state,
Key: key,
Name: name,
Description: desc,
Url: "https://bitbucket.org",
}
existingCommitStatus, err := b.Client.Repositories.Commits.GetCommitStatus(cmo, cso.Key)
var statusErr *bitbucket.UnexpectedResponseStatusError
if err != nil && !(errors.As(err, &statusErr) && strings.Contains(statusErr.Status, http.StatusText(http.StatusNotFound))) {
return fmt.Errorf("could not get commit status: %v", err)
}
dupe, err := duplicateBitbucketStatus(existingCommitStatus, cso)
if err != nil {
return err
}
if dupe {
return nil
}
_, err = b.Client.Repositories.Commits.CreateCommitStatus(cmo, cso)
if err != nil {
return err
}
return nil
}
func duplicateBitbucketStatus(statuses interface{}, status *bitbucket.CommitStatusOptions) (bool, error) {
commitStatus := bitbucket.CommitStatusOptions{}
b, err := json.Marshal(statuses)
if err != nil {
return false, err
}
err = json.Unmarshal(b, &commitStatus)
if err != nil {
return false, err
}
if commitStatus.State == status.State && commitStatus.Description == status.Description {
return true, nil
}
return false, nil
}
func toBitbucketState(severity string) (string, error) {
switch severity {
case eventv1.EventSeverityInfo:
return "SUCCESSFUL", nil
case eventv1.EventSeverityError:
return "FAILED", nil
default:
return "", errors.New("can't convert to bitbucket state")
}
}
/*
Copyright 2023 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/hashicorp/go-retryablehttp"
)
// BitbucketServer is a notifier for BitBucket Server and Data Center.
type BitbucketServer struct {
CommitStatus string
Url *url.URL
ProviderAddress string
Username string
Password string
Token string
Client *retryablehttp.Client
}
const (
bbServerEndPointCommitsTmpl = "%[1]s/rest/api/latest/projects/%[2]s/repos/%[3]s/commits"
bbServerEndPointBuildsTmpl = "%[1]s/builds"
bbServerGetBuildStatusQueryString = "key"
bbServerSourceCodeMgmtString = "/scm/"
)
type bbServerBuildStatus struct {
Name string `json:"name,omitempty"`
Key string `json:"key,omitempty"`
Parent string `json:"parent,omitempty"`
State string `json:"state,omitempty"`
Ref string `json:"ref,omitempty"`
BuildNumber string `json:"buildNumber,omitempty"`
Description string `json:"description,omitempty"`
Duration int64 `json:"duration,omitempty"`
UpdatedDate int64 `json:"updatedDate,omitempty"`
CreatedDate int64 `json:"createdDate,omitempty"`
Url string `json:"url,omitempty"`
}
type bbServerBuildStatusSetRequest struct {
BuildNumber string `json:"buildNumber,omitempty"`
Description string `json:"description,omitempty"`
Duration int64 `json:"duration,omitempty"`
Key string `json:"key"`
LastUpdated int64 `json:"lastUpdated,omitempty"`
Name string `json:"name,omitempty"`
Parent string `json:"parent,omitempty"`
Ref string `json:"ref,omitempty"`
State string `json:"state"`
Url string `json:"url"`
}
// NewBitbucketServer creates and returns a new BitbucketServer notifier.
func NewBitbucketServer(commitStatus string, addr string, token string, certPool *x509.CertPool, username string, password string) (*BitbucketServer, error) {
url, err := parseBitbucketServerGitAddress(addr)
if err != nil {
return nil, err
}
// this should never happen
if commitStatus == "" {
return nil, errors.New("commit status cannot be empty")
}
httpClient := retryablehttp.NewClient()
if certPool != nil {
httpClient.HTTPClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
},
}
}
httpClient.HTTPClient.Timeout = 15 * time.Second
httpClient.RetryWaitMin = 2 * time.Second
httpClient.RetryWaitMax = 30 * time.Second
httpClient.RetryMax = 4
httpClient.Logger = nil
if len(token) == 0 && (len(username) == 0 || len(password) == 0) {
return nil, errors.New("invalid credentials, expected to be one of username/password or API Token")
}
return &BitbucketServer{
CommitStatus: commitStatus,
Url: url,
ProviderAddress: addr,
Token: token,
Username: username,
Password: password,
Client: httpClient,
}, nil
}
// Post Bitbucket Server build status
func (b BitbucketServer) Post(ctx context.Context, event eventv1.Event) error {
// Skip progressing events
if event.HasReason(meta.ProgressingReason) {
return nil
}
revString, ok := event.GetRevision()
if !ok {
return errors.New("missing revision metadata")
}
rev, err := parseRevision(revString)
if err != nil {
return fmt.Errorf("could not parse revision: %w", err)
}
state, err := b.state(event.Severity)
if err != nil {
return fmt.Errorf("couldn't convert to bitbucket server state: %w", err)
}
name, desc := formatNameAndDescription(event)
name = name + " [" + desc + "]" //Bitbucket server displays this data on browser. Thus adding description here.
id := b.CommitStatus
// key has a limitation of 40 characters in bitbucket api
key := sha1String(id)
u := b.Url.JoinPath(b.createBuildPath(rev)).String()
dupe, err := b.duplicateBitbucketServerStatus(ctx, state, name, desc, key, u)
if err != nil {
return fmt.Errorf("could not get existing commit status: %w", err)
}
if !dupe {
_, err = b.postBuildStatus(ctx, state, name, desc, key, u)
if err != nil {
return fmt.Errorf("could not post build status: %w", err)
}
}
return nil
}
func (b BitbucketServer) state(severity string) (string, error) {
switch severity {
case eventv1.EventSeverityInfo:
return "SUCCESSFUL", nil
case eventv1.EventSeverityError:
return "FAILED", nil
default:
return "", errors.New("bitbucket server state generated on info or error events only")
}
}
func (b BitbucketServer) duplicateBitbucketServerStatus(ctx context.Context, state, name, desc, key, u string) (bool, error) {
// Prepare request object
req, err := b.prepareCommonRequest(ctx, u, nil, http.MethodGet)
if err != nil {
return false, fmt.Errorf("could not check duplicate commit status: %w", err)
}
// Set query string
q := url.Values{}
q.Add(bbServerGetBuildStatusQueryString, key)
req.URL.RawQuery = q.Encode()
// Make a GET call
d, err := b.Client.Do(req)
if err != nil {
return false, fmt.Errorf("failed api call to check duplicate commit status: %w", err)
}
if d != nil && isError(d) && d.StatusCode != http.StatusNotFound {
defer d.Body.Close()
return false, fmt.Errorf("failed api call to check duplicate commit status: %d - %s", d.StatusCode, http.StatusText(d.StatusCode))
}
defer d.Body.Close()
if d.StatusCode == http.StatusOK {
bd, err := io.ReadAll(d.Body)
if err != nil {
return false, fmt.Errorf("could not read response body for duplicate commit status: %w", err)
}
var existingCommitStatus bbServerBuildStatus
err = json.Unmarshal(bd, &existingCommitStatus)
if err != nil {
return false, fmt.Errorf("could not unmarshal json response body for duplicate commit status: %w", err)
}
// Do not post duplicate build status
if existingCommitStatus.Key == key && existingCommitStatus.State == state && existingCommitStatus.Description == desc && existingCommitStatus.Name == name {
return true, nil
}
}
return false, nil
}
func (b BitbucketServer) postBuildStatus(ctx context.Context, state, name, desc, key, url string) (*http.Response, error) {
//Prepare json body
j := &bbServerBuildStatusSetRequest{
Key: key,
State: state,
Url: b.ProviderAddress,
Description: desc,
Name: name,
}
p := new(bytes.Buffer)
err := json.NewEncoder(p).Encode(j)
if err != nil {
return nil, fmt.Errorf("failed preparing request for post build commit status, could not encode request body to json: %w", err)
}
//Prepare request
req, err := b.prepareCommonRequest(ctx, url, p, http.MethodPost)
if err != nil {
return nil, fmt.Errorf("failed preparing request for post build commit status: %w", err)
}
// Add Content type header
req.Header.Add("Content-Type", "application/json")
// Make a POST call
resp, err := b.Client.Do(req)
if err != nil {
return nil, fmt.Errorf("could not post build commit status: %w", err)
}
// Note: A non-2xx status code doesn't cause an error: https://pkg.go.dev/net/http#Client.Do
if isError(resp) {
defer resp.Body.Close()
return nil, fmt.Errorf("could not post build commit status: %d - %s", resp.StatusCode, http.StatusText(resp.StatusCode))
}
defer resp.Body.Close()
return resp, nil
}
func (b BitbucketServer) createBuildPath(rev string) string {
return fmt.Sprintf(bbServerEndPointBuildsTmpl, rev)
}
func parseBitbucketServerGitAddress(s string) (*url.URL, error) {
u, err := url.Parse(s)
if err != nil {
return nil, fmt.Errorf("could not parse git address: %w", err)
}
if u.Scheme != "http" && u.Scheme != "https" {
return nil, fmt.Errorf("could not parse git address: unsupported scheme type in address: %s. Must be http or https", u.Scheme)
}
idWithContext := strings.TrimSuffix(u.Path, ".git")
// /scm/ is always part of http/https clone urls : https://community.atlassian.com/t5/Bitbucket-questions/remote-url-in-Bitbucket-server-what-does-scm-represent-is-it/qaq-p/2060987
lastIndex := strings.LastIndex(idWithContext, bbServerSourceCodeMgmtString)
if lastIndex < 0 {
return nil, fmt.Errorf("could not parse git address: supplied provider address is not http(s) git clone url")
}
// Handle context scenarios --> https://confluence.atlassian.com/bitbucketserver/change-bitbucket-s-context-path-776640153.html
cntxtPath := idWithContext[:lastIndex] // Context path is anything that comes before last /scm/
id := idWithContext[lastIndex+len(bbServerSourceCodeMgmtString):] // Remove last `/scm/` from id as it is not used in API calls
comp := strings.Split(id, "/")
if len(comp) != 2 {
return nil, fmt.Errorf("could not parse git address: invalid repository id %q", id)
}
projectkey := comp[0]
reposlug := comp[1]
// Update the path till commits endpoint. The final builds endpoint would be added in Post function.
u.Path = fmt.Sprintf(bbServerEndPointCommitsTmpl, cntxtPath, projectkey, reposlug)
return u, nil
}
func (b BitbucketServer) prepareCommonRequest(ctx context.Context, path string, body io.Reader, method string) (*retryablehttp.Request, error) {
req, err := retryablehttp.NewRequestWithContext(ctx, method, path, body)
if err != nil {
return nil, fmt.Errorf("could not prepare request: %w", err)
}
if b.Token != "" {
req.Header.Set("Authorization", "Bearer "+b.Token)
} else {
req.Header.Add("Authorization", "Basic "+basicAuth(b.Username, b.Password))
}
req.Header.Add("x-atlassian-token", "no-check")
req.Header.Add("x-requested-with", "XMLHttpRequest")
return req, nil
}
// isError method returns true if HTTP status `code >= 400` otherwise false.
func isError(r *http.Response) bool {
return r.StatusCode > 399
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"runtime"
"time"
"github.com/hashicorp/go-retryablehttp"
)
type postOptions struct {
proxy string
certPool *x509.CertPool
requestModifier func(*retryablehttp.Request)
responseValidator func(statusCode int, body []byte) error
}
type postOption func(*postOptions)
func postMessage(ctx context.Context, address string, payload interface{}, opts ...postOption) error {
options := &postOptions{
// Default validateResponse function verifies that the response status code is 200, 202 or 201.
responseValidator: func(statusCode int, body []byte) error {
if statusCode == http.StatusOK ||
statusCode == http.StatusAccepted ||
statusCode == http.StatusCreated {
return nil
}
return fmt.Errorf("request failed with status code %d, %s", statusCode, string(body))
},
}
for _, o := range opts {
o(options)
}
httpClient, err := newHTTPClient(options)
if err != nil {
return err
}
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshalling notification payload failed: %w", err)
}
req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodPost, address, data)
if err != nil {
return fmt.Errorf("failed to create a new request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if options.requestModifier != nil {
options.requestModifier(req)
}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to execute request: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
if err := options.responseValidator(resp.StatusCode, body); err != nil {
return fmt.Errorf("request failed: %w", err)
}
return nil
}
func withProxy(proxy string) postOption {
return func(opts *postOptions) {
opts.proxy = proxy
}
}
func withCertPool(certPool *x509.CertPool) postOption {
return func(opts *postOptions) {
opts.certPool = certPool
}
}
func withRequestModifier(reqModifier func(*retryablehttp.Request)) postOption {
return func(opts *postOptions) {
opts.requestModifier = reqModifier
}
}
func withResponseValidator(respValidator func(statusCode int, body []byte) error) postOption {
return func(opts *postOptions) {
opts.responseValidator = respValidator
}
}
func newHTTPClient(opts *postOptions) (*retryablehttp.Client, error) {
httpClient := retryablehttp.NewClient()
if opts.certPool != nil {
httpClient.HTTPClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: opts.certPool,
},
}
}
if opts.proxy != "" {
proxyURL, err := url.Parse(opts.proxy)
if err != nil {
return nil, fmt.Errorf("unable to parse proxy URL: %w", err)
}
var tlsConfig *tls.Config
if opts.certPool != nil {
tlsConfig = &tls.Config{
RootCAs: opts.certPool,
}
}
httpClient.HTTPClient.Transport = &http.Transport{
Proxy: http.ProxyURL(proxyURL),
TLSClientConfig: tlsConfig,
DialContext: (&net.Dialer{
Timeout: 15 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
MaxIdleConnsPerHost: runtime.GOMAXPROCS(0) + 1,
}
}
// Disable the timeout for the HTTP client,
// as we set the provider timeout on the context.
httpClient.HTTPClient.Timeout = 0
httpClient.RetryWaitMin = 2 * time.Second
httpClient.RetryWaitMax = 30 * time.Second
httpClient.RetryMax = 4
httpClient.Logger = nil
return httpClient, nil
}
/*
Copyright 2023 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net/http"
"net/url"
"strings"
"github.com/DataDog/datadog-api-client-go/v2/api/datadog"
"github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
type DataDog struct {
apiClient *datadog.APIClient
eventsApi *datadogV1.EventsApi
apiKey string
}
// NewDataDog creates a new DataDog provider by mapping the notification provider API to sensible values for the DataDog API.
// url: The DataDog API endpoint to use. Examples: https://api.datadoghq.com, https://api.datadoghq.eu, etc.
// token: The DataDog API key (not the application key).
// headers: A map of extra tags to add to the event
func NewDataDog(address string, proxyUrl string, certPool *x509.CertPool, token string) (*DataDog, error) {
conf := datadog.NewConfiguration()
if token == "" {
return nil, fmt.Errorf("token cannot be empty")
}
baseUrl, err := url.Parse(address)
if err != nil {
return nil, fmt.Errorf("failed to parse address %q: %w", address, err)
}
conf.Host = baseUrl.Host
conf.Scheme = baseUrl.Scheme
if proxyUrl != "" || certPool != nil {
transport := &http.Transport{}
if proxyUrl != "" {
proxy, err := url.Parse(proxyUrl)
if err != nil {
return nil, fmt.Errorf("failed to parse proxy URL %q: %w", proxyUrl, err)
}
transport.Proxy = http.ProxyURL(proxy)
}
if certPool != nil {
transport.TLSClientConfig = &tls.Config{
RootCAs: certPool,
}
}
conf.HTTPClient = &http.Client{
Transport: transport,
}
}
apiClient := datadog.NewAPIClient(conf)
eventsApi := datadogV1.NewEventsApi(apiClient)
return &DataDog{
apiClient: apiClient,
eventsApi: eventsApi,
apiKey: token,
}, nil
}
func (d *DataDog) Post(ctx context.Context, event eventv1.Event) error {
dataDogEvent := d.toDataDogEvent(&event)
_, _, err := d.eventsApi.CreateEvent(d.dataDogCtx(ctx), dataDogEvent)
if err != nil {
return fmt.Errorf("failed to post event to DataDog: %w", err)
}
return nil
}
// dataDogCtx returns a context with the DataDog API key set.
// This is one way to authenticate with the DataDog API.
func (d *DataDog) dataDogCtx(ctx context.Context) context.Context {
return context.WithValue(ctx, datadog.ContextAPIKeys, map[string]datadog.APIKey{
"apiKeyAuth": {
Key: d.apiKey,
},
})
}
// toDataDogEvent converts an eventv1.Event to a datadogV1.EventCreateRequest.
func (d *DataDog) toDataDogEvent(event *eventv1.Event) datadogV1.EventCreateRequest {
return datadogV1.EventCreateRequest{
// Note: Title's printf format matches other events from datadog's kubernetes integration
Title: fmt.Sprintf("Events from the %s %s/%s", event.InvolvedObject.Kind, event.InvolvedObject.Name, event.InvolvedObject.Namespace),
Text: event.Message,
Tags: d.toDataDogTags(event),
// fluxcd matches the name datadog picked for their flux integration: https://docs.datadoghq.com/integrations/fluxcd/
SourceTypeName: strPtr("fluxcd"),
DateHappened: int64Ptr(event.Timestamp.Unix()),
AlertType: toDataDogAlertType(event),
}
}
// toDataDogTags parses an eventv1.Event to return a slice of tags.
// We set kind, name, and namespace to the appropriate values of the involved object.
func (d *DataDog) toDataDogTags(event *eventv1.Event) []string {
// Note: Datadog's built in kubernetes tagging is documented here: https://docs.datadoghq.com/containers/kubernetes/tag/?tab=containerizedagent#out-of-the-box-tags
tags := []string{
fmt.Sprintf("flux_reporting_controller:%s", event.ReportingController),
fmt.Sprintf("flux_reason:%s", event.Reason),
// Note: DataDog standardizes kubernetes tags as "kube_*": https://github.com/DataDog/datadog-agent/blob/82dc933aa86de037c70fe960384aa06a62e457a8/pkg/collector/corechecks/cluster/kubernetesapiserver/events_common.go#L48
fmt.Sprintf("kube_kind:%s", event.InvolvedObject.Kind),
fmt.Sprintf("kube_name:%s", event.InvolvedObject.Name),
fmt.Sprintf("kube_namespace:%s", event.InvolvedObject.Namespace),
}
// add extra tags from event metadata
for k, v := range event.Metadata {
tags = append(tags, fmt.Sprintf("%s:%s", k, v))
}
// Note: https://docs.datadoghq.com/getting_started/tagging/
// "Tags are converted to lowercase"
// To keep the events consistent, we run toLower on all input strings.
for idx := range tags {
tags[idx] = strings.ToLower(tags[idx])
}
return tags
}
// toDataDogAlertType parses an eventv1.Event to return a datadogV1.EventAlertType.
func toDataDogAlertType(event *eventv1.Event) *datadogV1.EventAlertType {
if event.Severity == eventv1.EventSeverityError {
return dataDogEventAlertTypePtr(datadogV1.EVENTALERTTYPE_ERROR)
}
return dataDogEventAlertTypePtr(datadogV1.EVENTALERTTYPE_INFO)
}
func dataDogEventAlertTypePtr(t datadogV1.EventAlertType) *datadogV1.EventAlertType {
return &t
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"fmt"
"net/url"
"path"
"strings"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
// Discord holds the hook URL
type Discord struct {
URL string
ProxyURL string
Username string
Channel string
}
// NewDiscord validates the URL and returns a Discord object
func NewDiscord(hookURL string, proxyURL string, username string, channel string) (*Discord, error) {
webhook, err := url.ParseRequestURI(hookURL)
if err != nil {
return nil, fmt.Errorf("invalid Discord hook URL %s: '%w'", hookURL, err)
}
// use Slack formatting
// https://birdie0.github.io/discord-webhooks-guide/other/slack_formatting.html
if !strings.HasSuffix(hookURL, "/slack") {
webhook.Path = path.Join(webhook.Path, "slack")
hookURL = webhook.String()
}
return &Discord{
Channel: channel,
URL: hookURL,
ProxyURL: proxyURL,
Username: username,
}, nil
}
// Post Discord message
func (s *Discord) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
payload := SlackPayload{
Username: s.Username,
}
if payload.Username == "" {
payload.Username = event.ReportingController
}
color := "good"
if event.Severity == eventv1.EventSeverityError {
color = "danger"
}
sfields := make([]SlackField, 0, len(event.Metadata))
for k, v := range event.Metadata {
sfields = append(sfields, SlackField{k, v, false})
}
a := SlackAttachment{
Color: color,
AuthorName: fmt.Sprintf("%s/%s.%s", strings.ToLower(event.InvolvedObject.Kind), event.InvolvedObject.Name, event.InvolvedObject.Namespace),
Text: event.Message,
MrkdwnIn: []string{"text"},
Fields: sfields,
}
payload.Attachments = []SlackAttachment{a}
var opts []postOption
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/client"
pkgcache "github.com/fluxcd/pkg/cache"
apiv1 "github.com/fluxcd/notification-controller/api/v1beta3"
)
var (
// notifiers is a map of notifier names to factory functions.
notifiers = notifierMap{
// GenericProvider is the default notifier
apiv1.GenericProvider: genericNotifierFunc,
apiv1.GenericHMACProvider: genericHMACNotifierFunc,
apiv1.SlackProvider: slackNotifierFunc,
apiv1.DiscordProvider: discordNotifierFunc,
apiv1.RocketProvider: rocketNotifierFunc,
apiv1.MSTeamsProvider: msteamsNotifierFunc,
apiv1.GoogleChatProvider: googleChatNotifierFunc,
apiv1.GooglePubSubProvider: googlePubSubNotifierFunc,
apiv1.WebexProvider: webexNotifierFunc,
apiv1.SentryProvider: sentryNotifierFunc,
apiv1.AzureEventHubProvider: azureEventHubNotifierFunc,
apiv1.TelegramProvider: telegramNotifierFunc,
apiv1.LarkProvider: larkNotifierFunc,
apiv1.Matrix: matrixNotifierFunc,
apiv1.OpsgenieProvider: opsgenieNotifierFunc,
apiv1.AlertManagerProvider: alertmanagerNotifierFunc,
apiv1.GrafanaProvider: grafanaNotifierFunc,
apiv1.PagerDutyProvider: pagerDutyNotifierFunc,
apiv1.DataDogProvider: dataDogNotifierFunc,
apiv1.NATSProvider: natsNotifierFunc,
apiv1.GitHubProvider: gitHubNotifierFunc,
apiv1.GitHubDispatchProvider: gitHubDispatchNotifierFunc,
apiv1.GitLabProvider: gitLabNotifierFunc,
apiv1.GiteaProvider: giteaNotifierFunc,
apiv1.BitbucketServerProvider: bitbucketServerNotifierFunc,
apiv1.BitbucketProvider: bitbucketNotifierFunc,
apiv1.AzureDevOpsProvider: azureDevOpsNotifierFunc,
}
)
// notifierMap is a map of provider names to notifier factory functions
type notifierMap map[string]factoryFunc
// factoryFunc is a factory function that creates a new notifier
type factoryFunc func(opts notifierOptions) (Interface, error)
type notifierOptions struct {
Context context.Context
URL string
ProxyURL string
Username string
Channel string
Token string
Headers map[string]string
CertPool *x509.CertPool
Password string
CommitStatus string
ProviderName string
ProviderNamespace string
SecretData map[string][]byte
ServiceAccountName string
TokenCache *pkgcache.TokenCache
TokenClient client.Client
}
type Factory struct {
notifierOptions
}
// Option represents a functional option for configuring a notifier.
type Option func(*notifierOptions)
// WithProxyURL sets the proxy URL for the notifier.
func WithProxyURL(url string) Option {
return func(o *notifierOptions) {
o.ProxyURL = url
}
}
// WithUsername sets the username for the notifier.
func WithUsername(username string) Option {
return func(o *notifierOptions) {
o.Username = username
}
}
// WithChannel sets the channel for the notifier.
func WithChannel(channel string) Option {
return func(o *notifierOptions) {
o.Channel = channel
}
}
// WithToken sets the token for the notifier.
func WithToken(token string) Option {
return func(o *notifierOptions) {
o.Token = token
}
}
// WithHeaders sets the headers for the notifier.
func WithHeaders(headers map[string]string) Option {
return func(o *notifierOptions) {
o.Headers = headers
}
}
// WithCertPool sets the certificate pool for the notifier.
func WithCertPool(certPool *x509.CertPool) Option {
return func(o *notifierOptions) {
o.CertPool = certPool
}
}
// WithPassword sets the password for the notifier.
func WithPassword(password string) Option {
return func(o *notifierOptions) {
o.Password = password
}
}
// WithCommitStatus sets the custom commit status for the notifier.
func WithCommitStatus(commitStatus string) Option {
return func(o *notifierOptions) {
o.CommitStatus = commitStatus
}
}
// WithProviderName sets the provider name for the notifier.
func WithProviderName(name string) Option {
return func(o *notifierOptions) {
o.ProviderName = name
}
}
// WithProviderNamespace sets the provider namespace for the notifier.
func WithProviderNamespace(namespace string) Option {
return func(o *notifierOptions) {
o.ProviderNamespace = namespace
}
}
// WithSecretData sets the secret data for the notifier.
func WithSecretData(data map[string][]byte) Option {
return func(o *notifierOptions) {
o.SecretData = data
}
}
// WithTokenCache sets the token cache for the notifier.
func WithTokenCache(cache *pkgcache.TokenCache) Option {
return func(o *notifierOptions) {
o.TokenCache = cache
}
}
// WithTokenClient sets the token client for the notifier.
func WithTokenClient(kubeClient client.Client) Option {
return func(o *notifierOptions) {
o.TokenClient = kubeClient
}
}
// WithServiceAccount sets the service account for the notifier.
func WithServiceAccount(serviceAccountName string) Option {
return func(o *notifierOptions) {
o.ServiceAccountName = serviceAccountName
}
}
// NewFactory creates a new notifier factory with the given URL and optional configurations.
func NewFactory(ctx context.Context, url string, opts ...Option) *Factory {
options := notifierOptions{
Context: ctx,
URL: url,
}
for _, opt := range opts {
opt(&options)
}
return &Factory{
notifierOptions: options,
}
}
func (f Factory) Notifier(provider string) (Interface, error) {
if f.URL == "" {
return &NopNotifier{}, nil
}
var (
n Interface
err error
)
if notifier, ok := notifiers[provider]; ok {
n, err = notifier(f.notifierOptions)
} else {
err = fmt.Errorf("provider %s not supported", provider)
}
if err != nil {
n = &NopNotifier{}
}
return n, err
}
func genericNotifierFunc(opts notifierOptions) (Interface, error) {
return NewForwarder(opts.URL, opts.ProxyURL, opts.Headers, opts.CertPool, nil)
}
func genericHMACNotifierFunc(opts notifierOptions) (Interface, error) {
return NewForwarder(opts.URL, opts.ProxyURL, opts.Headers, opts.CertPool, []byte(opts.Token))
}
func slackNotifierFunc(opts notifierOptions) (Interface, error) {
return NewSlack(opts.URL, opts.ProxyURL, opts.Token, opts.CertPool, opts.Username, opts.Channel)
}
func discordNotifierFunc(opts notifierOptions) (Interface, error) {
return NewDiscord(opts.URL, opts.ProxyURL, opts.Username, opts.Channel)
}
func rocketNotifierFunc(opts notifierOptions) (Interface, error) {
return NewRocket(opts.URL, opts.ProxyURL, opts.CertPool, opts.Username, opts.Channel)
}
func msteamsNotifierFunc(opts notifierOptions) (Interface, error) {
return NewMSTeams(opts.URL, opts.ProxyURL, opts.CertPool)
}
func googleChatNotifierFunc(opts notifierOptions) (Interface, error) {
return NewGoogleChat(opts.URL, opts.ProxyURL)
}
func googlePubSubNotifierFunc(opts notifierOptions) (Interface, error) {
return NewGooglePubSub(opts.URL, opts.Channel, opts.Token, opts.Headers)
}
func webexNotifierFunc(opts notifierOptions) (Interface, error) {
return NewWebex(opts.URL, opts.ProxyURL, opts.CertPool, opts.Channel, opts.Token)
}
func sentryNotifierFunc(opts notifierOptions) (Interface, error) {
return NewSentry(opts.CertPool, opts.URL, opts.Channel)
}
func azureEventHubNotifierFunc(opts notifierOptions) (Interface, error) {
return NewAzureEventHub(opts.Context, opts.URL, opts.Token, opts.Channel, opts.ProxyURL, opts.ServiceAccountName, opts.ProviderName, opts.ProviderNamespace, opts.TokenClient, opts.TokenCache)
}
func telegramNotifierFunc(opts notifierOptions) (Interface, error) {
return NewTelegram(opts.Channel, opts.Token)
}
func larkNotifierFunc(opts notifierOptions) (Interface, error) {
return NewLark(opts.URL)
}
func matrixNotifierFunc(opts notifierOptions) (Interface, error) {
return NewMatrix(opts.URL, opts.Token, opts.Channel, opts.CertPool)
}
func opsgenieNotifierFunc(opts notifierOptions) (Interface, error) {
return NewOpsgenie(opts.URL, opts.ProxyURL, opts.CertPool, opts.Token)
}
func alertmanagerNotifierFunc(opts notifierOptions) (Interface, error) {
return NewAlertmanager(opts.URL, opts.ProxyURL, opts.CertPool, opts.Token)
}
func grafanaNotifierFunc(opts notifierOptions) (Interface, error) {
return NewGrafana(opts.URL, opts.ProxyURL, opts.Token, opts.CertPool, opts.Username, opts.Password)
}
func pagerDutyNotifierFunc(opts notifierOptions) (Interface, error) {
return NewPagerDuty(opts.URL, opts.ProxyURL, opts.CertPool, opts.Channel)
}
func dataDogNotifierFunc(opts notifierOptions) (Interface, error) {
return NewDataDog(opts.URL, opts.ProxyURL, opts.CertPool, opts.Token)
}
func natsNotifierFunc(opts notifierOptions) (Interface, error) {
return NewNATS(opts.URL, opts.Channel, opts.Username, opts.Password)
}
func gitHubNotifierFunc(opts notifierOptions) (Interface, error) {
if opts.Token == "" && opts.Password != "" {
opts.Token = opts.Password
}
return NewGitHub(opts.CommitStatus, opts.URL, opts.Token, opts.CertPool, opts.ProxyURL, opts.ProviderName, opts.ProviderNamespace, opts.SecretData, opts.TokenCache)
}
func gitHubDispatchNotifierFunc(opts notifierOptions) (Interface, error) {
if opts.Token == "" && opts.Password != "" {
opts.Token = opts.Password
}
return NewGitHubDispatch(opts.URL, opts.Token, opts.CertPool, opts.ProxyURL, opts.ProviderName, opts.ProviderNamespace, opts.SecretData, opts.TokenCache)
}
func gitLabNotifierFunc(opts notifierOptions) (Interface, error) {
if opts.Token == "" && opts.Password != "" {
opts.Token = opts.Password
}
return NewGitLab(opts.CommitStatus, opts.URL, opts.Token, opts.CertPool)
}
func giteaNotifierFunc(opts notifierOptions) (Interface, error) {
if opts.Token == "" && opts.Password != "" {
opts.Token = opts.Password
}
return NewGitea(opts.CommitStatus, opts.URL, opts.ProxyURL, opts.Token, opts.CertPool)
}
func bitbucketServerNotifierFunc(opts notifierOptions) (Interface, error) {
return NewBitbucketServer(opts.CommitStatus, opts.URL, opts.Token, opts.CertPool, opts.Username, opts.Password)
}
func bitbucketNotifierFunc(opts notifierOptions) (Interface, error) {
return NewBitbucket(opts.CommitStatus, opts.URL, opts.Token, opts.CertPool)
}
func azureDevOpsNotifierFunc(opts notifierOptions) (Interface, error) {
return NewAzureDevOps(opts.CommitStatus, opts.URL, opts.Token, opts.CertPool)
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/hmac"
"crypto/sha256"
"crypto/x509"
"encoding/json"
"fmt"
"net/url"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/hashicorp/go-retryablehttp"
)
// NotificationHeader is a header sent to identify requests from the
// notification controller.
const NotificationHeader = "gotk-component"
// Forwarder is an implementation of the notification Interface that posts the
// body as an HTTP request using an optional proxy.
type Forwarder struct {
URL string
ProxyURL string
Headers map[string]string
CertPool *x509.CertPool
HMACKey []byte
}
func NewForwarder(hookURL string, proxyURL string, headers map[string]string, certPool *x509.CertPool, hmacKey []byte) (*Forwarder, error) {
if _, err := url.ParseRequestURI(hookURL); err != nil {
return nil, fmt.Errorf("invalid hook URL %s: %w", hookURL, err)
}
if hmacKey != nil && len(hmacKey) == 0 {
return nil, fmt.Errorf("HMAC key is empty")
}
return &Forwarder{
URL: hookURL,
ProxyURL: proxyURL,
Headers: headers,
CertPool: certPool,
HMACKey: hmacKey,
}, nil
}
func sign(payload, key []byte) string {
h := hmac.New(sha256.New, key)
h.Write(payload)
return fmt.Sprintf("%x", h.Sum(nil))
}
func (f *Forwarder) Post(ctx context.Context, event eventv1.Event) error {
var sig string
if len(f.HMACKey) != 0 {
eventJSON, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed marshalling event: %w", err)
}
sig = fmt.Sprintf("sha256=%s", sign(eventJSON, f.HMACKey))
}
opts := []postOption{
withRequestModifier(func(req *retryablehttp.Request) {
req.Header.Set(NotificationHeader, event.ReportingController)
for key, val := range f.Headers {
req.Header.Set(key, val)
}
if sig != "" {
req.Header.Set("X-Signature", sig)
}
}),
}
if f.ProxyURL != "" {
opts = append(opts, withProxy(f.ProxyURL))
}
if f.CertPool != nil {
opts = append(opts, withCertPool(f.CertPool))
}
if err := postMessage(ctx, f.URL, event, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
"net/url"
"os"
"strings"
"code.gitea.io/sdk/gitea"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
ctrl "sigs.k8s.io/controller-runtime"
)
type Gitea struct {
BaseURL string
Token string
Owner string
Repo string
CommitStatus string
Client *gitea.Client
Debug bool
}
var _ Interface = &Gitea{}
func NewGitea(commitStatus string, addr string, proxyURL string, token string, certPool *x509.CertPool) (*Gitea, error) {
if len(token) == 0 {
return nil, errors.New("gitea token cannot be empty")
}
// this should never happen
if commitStatus == "" {
return nil, errors.New("commit status cannot be empty")
}
host, id, err := parseGitAddress(addr)
if err != nil {
return nil, fmt.Errorf("failed parsing Git URL: %w", err)
}
if _, err := url.Parse(host); err != nil {
return nil, fmt.Errorf("failed parsing host: %w", err)
}
idComponents := strings.Split(id, "/")
if len(idComponents) != 2 {
return nil, fmt.Errorf("invalid repository id %q", id)
}
tr := &http.Transport{}
if certPool != nil {
tr.TLSClientConfig = &tls.Config{
RootCAs: certPool,
}
}
if proxyURL != "" {
parsedProxyURL, err := url.Parse(proxyURL)
if err != nil {
return nil, errors.New("invalid proxy URL")
}
tr.Proxy = http.ProxyURL(parsedProxyURL)
}
client, err := gitea.NewClient(host, gitea.SetToken(token), gitea.SetHTTPClient(&http.Client{Transport: tr}))
if err != nil {
return nil, fmt.Errorf("failed creating Gitea client: %w", err)
}
return &Gitea{
BaseURL: host,
Token: token,
Owner: idComponents[0],
Repo: idComponents[1],
CommitStatus: commitStatus,
Client: client,
Debug: os.Getenv("NOTIFIER_GITEA_DEBUG") == "true",
}, nil
}
func (g *Gitea) Post(ctx context.Context, event eventv1.Event) error {
revString, ok := event.GetRevision()
if !ok {
return errors.New("missing revision metadata")
}
rev, err := parseRevision(revString)
if err != nil {
return err
}
state, err := toGiteaState(event)
if err != nil {
return err
}
_, desc := formatNameAndDescription(event)
id := g.CommitStatus
status := gitea.CreateStatusOption{
State: state,
TargetURL: "",
Description: desc,
Context: id,
}
listStatusesOpts := gitea.ListStatusesOption{
ListOptions: gitea.ListOptions{
Page: 0,
PageSize: 50,
},
}
statuses, _, err := g.Client.ListStatuses(g.Owner, g.Repo, rev, listStatusesOpts)
if err != nil {
return fmt.Errorf("could not list commit statuses: %w", err)
}
if duplicateGiteaStatus(statuses, &status) {
if g.Debug {
ctrl.Log.Info("gitea skip posting duplicate status",
"owner", g.Owner, "repo", g.Repo, "commit_hash", rev, "status", status)
}
return nil
}
if g.Debug {
ctrl.Log.Info("gitea create commit begin",
"base_url", g.BaseURL, "token", g.Token, "event", event, "status", status)
}
st, rsp, err := g.Client.CreateStatus(g.Owner, g.Repo, rev, status)
if err != nil {
if g.Debug {
ctrl.Log.Error(err, "gitea create commit failed", "status", status)
}
return err
}
if g.Debug {
ctrl.Log.Info("gitea create commit ok", "response", rsp, "response_status", st)
}
return nil
}
func toGiteaState(event eventv1.Event) (gitea.StatusState, error) {
// progressing events
if event.HasReason(meta.ProgressingReason) {
// pending
return gitea.StatusPending, nil
}
switch event.Severity {
case eventv1.EventSeverityInfo:
return gitea.StatusSuccess, nil
case eventv1.EventSeverityError:
return gitea.StatusFailure, nil
default:
return gitea.StatusError, errors.New("can't convert to gitea state")
}
}
// duplicateGiteaStatus return true if the latest status
// with a matching context has the same state and description
func duplicateGiteaStatus(statuses []*gitea.Status, status *gitea.CreateStatusOption) bool {
if status == nil || statuses == nil {
return false
}
for _, s := range statuses {
if s.Context == "" || s.State == "" || s.Description == "" {
continue
}
if s.Context == status.Context {
if s.State == status.State && s.Description == status.Description {
return true
}
return false
}
}
return false
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"errors"
"fmt"
"github.com/google/go-github/v64/github"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
pkgcache "github.com/fluxcd/pkg/cache"
)
type GitHub struct {
Owner string
Repo string
CommitStatus string
Client *github.Client
}
func NewGitHub(commitStatus string, addr string, token string, certPool *x509.CertPool, proxyURL string, providerName string, providerNamespace string, secretData map[string][]byte, tokenCache *pkgcache.TokenCache) (*GitHub, error) {
// this should never happen
if commitStatus == "" {
return nil, errors.New("commit status cannot be empty")
}
repoInfo, err := getRepoInfoAndGithubClient(addr, token, certPool, proxyURL, providerName, providerNamespace, secretData, tokenCache)
if err != nil {
return nil, err
}
return &GitHub{
Owner: repoInfo.owner,
Repo: repoInfo.repo,
CommitStatus: commitStatus,
Client: repoInfo.client,
}, nil
}
// Post Github commit status
func (g *GitHub) Post(ctx context.Context, event eventv1.Event) error {
// Skip progressing events
if event.HasReason(meta.ProgressingReason) {
return nil
}
revString, ok := event.GetRevision()
if !ok {
return errors.New("missing revision metadata")
}
rev, err := parseRevision(revString)
if err != nil {
return err
}
state, err := toGitHubState(event.Severity)
if err != nil {
return err
}
_, desc := formatNameAndDescription(event)
id := g.CommitStatus
status := &github.RepoStatus{
State: &state,
Context: &id,
Description: &desc,
}
opts := &github.ListOptions{PerPage: 50}
statuses, _, err := g.Client.Repositories.ListStatuses(ctx, g.Owner, g.Repo, rev, opts)
if err != nil {
return fmt.Errorf("could not list commit statuses: %v", err)
}
if duplicateGithubStatus(statuses, status) {
return nil
}
_, _, err = g.Client.Repositories.CreateStatus(ctx, g.Owner, g.Repo, rev, status)
if err != nil {
return fmt.Errorf("could not create commit status: %v", err)
}
return nil
}
func toGitHubState(severity string) (string, error) {
switch severity {
case eventv1.EventSeverityInfo:
return "success", nil
case eventv1.EventSeverityError:
return "failure", nil
default:
return "", errors.New("can't convert to github state")
}
}
// duplicateStatus return true if the latest status
// with a matching context has the same state and description
func duplicateGithubStatus(statuses []*github.RepoStatus, status *github.RepoStatus) bool {
if status == nil || statuses == nil {
return false
}
for _, s := range statuses {
if s.Context == nil || s.State == nil || s.Description == nil {
continue
}
if *s.Context == *status.Context {
if *s.State == *status.State && *s.Description == *status.Description {
return true
}
return false
}
}
return false
}
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"encoding/json"
"fmt"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
pkgcache "github.com/fluxcd/pkg/cache"
"github.com/google/go-github/v64/github"
)
type GitHubDispatch struct {
Owner string
Repo string
Client *github.Client
}
func NewGitHubDispatch(addr string, token string, certPool *x509.CertPool, proxyURL string, providerName string, providerNamespace string, secretData map[string][]byte, tokenCache *pkgcache.TokenCache) (*GitHubDispatch, error) {
repoInfo, err := getRepoInfoAndGithubClient(addr, token, certPool, proxyURL, providerName, providerNamespace, secretData, tokenCache)
if err != nil {
return nil, err
}
return &GitHubDispatch{
Owner: repoInfo.owner,
Repo: repoInfo.repo,
Client: repoInfo.client,
}, nil
}
// Post GitHub Repository Dispatch webhook
func (g *GitHubDispatch) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
eventType := fmt.Sprintf("%s/%s.%s",
event.InvolvedObject.Kind, event.InvolvedObject.Name, event.InvolvedObject.Namespace)
eventData, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal object into json: %w", err)
}
eventDataRaw := json.RawMessage(eventData)
opts := github.DispatchRequestOptions{
EventType: eventType,
ClientPayload: &eventDataRaw,
}
_, _, err = g.Client.Repositories.Dispatch(ctx, g.Owner, g.Repo, opts)
if err != nil {
return fmt.Errorf("could not send github repository dispatch webhook: %v", err)
}
return nil
}
/*
Copyright 2025 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"github.com/fluxcd/notification-controller/api/v1beta3"
pkgcache "github.com/fluxcd/pkg/cache"
"github.com/fluxcd/pkg/git/github"
authgithub "github.com/fluxcd/pkg/git/github"
"golang.org/x/oauth2"
gogithub "github.com/google/go-github/v64/github"
)
// repoInfo is an internal type encapsulating owner, repo and client
type repoInfo struct {
owner string
repo string
client *gogithub.Client
}
// getGitHubAppOptions constructs the github app authentication options.
func getGitHubAppOptions(providerName, providerNamespace, proxy string, secretData map[string][]byte, tokenCache *pkgcache.TokenCache) ([]github.OptFunc, error) {
githubOpts := []github.OptFunc{}
if val, ok := secretData[github.AppIDKey]; ok {
githubOpts = append(githubOpts, github.WithAppID(string(val)))
}
if val, ok := secretData[github.AppInstallationIDKey]; ok {
githubOpts = append(githubOpts, github.WithInstllationID(string(val)))
}
if val, ok := secretData[github.AppPrivateKey]; ok {
githubOpts = append(githubOpts, github.WithPrivateKey(val))
}
if val, ok := secretData[github.AppBaseUrlKey]; ok {
githubOpts = append(githubOpts, github.WithAppBaseURL(string(val)))
}
if len(githubOpts) > 0 && proxy != "" {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("error parsing proxy URL '%s': %w", proxy, err)
}
githubOpts = append(githubOpts, github.WithProxyURL(proxyURL))
}
if len(githubOpts) > 0 && tokenCache != nil {
githubOpts = append(githubOpts, github.WithCache(tokenCache, v1beta3.ProviderKind, providerName, providerNamespace, OperationPost))
}
return githubOpts, nil
}
// getRepoInfoAndGithubClient gets the github client and repository info used by Github and GithubDispatch providers
func getRepoInfoAndGithubClient(addr string, token string, certPool *x509.CertPool, proxyURL string, providerName string, providerNamespace string, secretData map[string][]byte, tokenCache *pkgcache.TokenCache) (*repoInfo, error) {
if len(token) == 0 {
githubOpts, err := getGitHubAppOptions(providerName, providerNamespace, proxyURL, secretData, tokenCache)
if err != nil {
return nil, err
}
if len(githubOpts) == 0 {
return nil, errors.New("github token or github app details must be specified")
}
client, err := authgithub.New(githubOpts...)
if err != nil {
return nil, err
}
appToken, err := client.GetToken(context.Background())
if err != nil {
return nil, err
}
token = appToken.Token
}
host, id, err := parseGitAddress(addr)
if err != nil {
return nil, err
}
comp := strings.Split(id, "/")
if len(comp) != 2 {
return nil, fmt.Errorf("invalid repository id %q", id)
}
baseUrl, err := url.Parse(host)
if err != nil {
return nil, err
}
ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: token})
tc := oauth2.NewClient(context.Background(), ts)
client := gogithub.NewClient(tc)
if baseUrl.Host != "github.com" {
if certPool != nil {
tr := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
},
}
hc := &http.Client{Transport: tr}
ctx := context.WithValue(context.Background(), oauth2.HTTPClient, hc)
tc = oauth2.NewClient(ctx, ts)
}
client, err = gogithub.NewClient(tc).WithEnterpriseURLs(host, host)
if err != nil {
return nil, fmt.Errorf("could not create enterprise GitHub client: %v", err)
}
}
return &repoInfo{comp[0], comp[1], client}, nil
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
gitlab "gitlab.com/gitlab-org/api/client-go"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
)
type GitLab struct {
Id string
CommitStatus string
Client *gitlab.Client
}
func NewGitLab(commitStatus string, addr string, token string, certPool *x509.CertPool) (*GitLab, error) {
if len(token) == 0 {
return nil, errors.New("gitlab token cannot be empty")
}
host, id, err := parseGitAddress(addr)
if err != nil {
return nil, err
}
// this should never happen
if commitStatus == "" {
return nil, errors.New("commit status cannot be empty")
}
opts := []gitlab.ClientOptionFunc{gitlab.WithBaseURL(host)}
if certPool != nil {
tr := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
},
}
hc := &http.Client{Transport: tr}
opts = append(opts, gitlab.WithHTTPClient(hc))
}
client, err := gitlab.NewClient(token, opts...)
if err != nil {
return nil, err
}
gitlab := &GitLab{
Id: id,
CommitStatus: commitStatus,
Client: client,
}
return gitlab, nil
}
// Post GitLab commit status
func (g *GitLab) Post(ctx context.Context, event eventv1.Event) error {
// Skip progressing events
if event.HasReason(meta.ProgressingReason) {
return nil
}
revString, ok := event.GetRevision()
if !ok {
return errors.New("missing revision metadata")
}
rev, err := parseRevision(revString)
if err != nil {
return err
}
state, err := toGitLabState(event.Severity)
if err != nil {
return err
}
_, desc := formatNameAndDescription(event)
id := g.CommitStatus
status := &gitlab.CommitStatus{
Name: id,
SHA: rev,
Status: string(state),
Description: desc,
}
getOpt := &gitlab.GetCommitStatusesOptions{
Name: &status.Name,
}
statuses, _, err := g.Client.Commits.GetCommitStatuses(g.Id, rev, getOpt, gitlab.WithContext(ctx))
if err != nil {
return fmt.Errorf("unable to list commit status: %s", err)
}
if duplicateGitlabStatus(statuses, status) {
return nil
}
setOpt := &gitlab.SetCommitStatusOptions{
Name: &id,
Description: &desc,
State: state,
}
_, _, err = g.Client.Commits.SetCommitStatus(g.Id, rev, setOpt, gitlab.WithContext(ctx))
if err != nil {
return err
}
return nil
}
func toGitLabState(severity string) (gitlab.BuildStateValue, error) {
switch severity {
case eventv1.EventSeverityInfo:
return gitlab.Success, nil
case eventv1.EventSeverityError:
return gitlab.Failed, nil
default:
return "", errors.New("can't convert to gitlab state")
}
}
func duplicateGitlabStatus(statuses []*gitlab.CommitStatus, status *gitlab.CommitStatus) bool {
for _, s := range statuses {
if s.SHA == status.SHA {
if s.Status == status.Status && s.Description == status.Description && s.Name == status.Name {
return true
}
}
}
return false
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"fmt"
"net/url"
"strings"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
// Slack holds the hook URL
type GoogleChat struct {
URL string
ProxyURL string
Username string
Channel string
}
// GoogleChatPayload holds the channel and attachments
type GoogleChatPayload struct {
Cards []GoogleChatCard `json:"cards"`
}
type GoogleChatCard struct {
Header GoogleChatCardHeader `json:"header"`
Sections []GoogleChatCardSection `json:"sections"`
}
type GoogleChatCardHeader struct {
Title string `json:"title"`
SubTitle string `json:"subtitle"`
ImageUrl *string `json:"imageUrl"`
ImageStyle *string `json:"imageStyle"`
}
type GoogleChatCardSection struct {
Header string `json:"header"`
Widgets []GoogleChatCardWidget `json:"widgets"`
}
type GoogleChatCardWidget struct {
TextParagraph *GoogleChatCardWidgetTextParagraph `json:"textParagraph"`
KeyValue *GoogleChatCardWidgetKeyValue `json:"keyValue"`
}
type GoogleChatCardWidgetTextParagraph struct {
Text string `json:"text"`
}
type GoogleChatCardWidgetKeyValue struct {
TopLabel string `json:"topLabel"`
Content string `json:"content"`
ContentMultiLine bool `json:"contentMultiline"`
BottomLabel *string `json:"bottomLabel"`
Icon *string `json:"icon"`
}
// NewGoogleChat validates the Google Chat URL and returns a GoogleChat object
func NewGoogleChat(hookURL string, proxyURL string) (*GoogleChat, error) {
_, err := url.ParseRequestURI(hookURL)
if err != nil {
return nil, fmt.Errorf("invalid Google Chat hook URL %s", hookURL)
}
return &GoogleChat{
URL: hookURL,
ProxyURL: proxyURL,
}, nil
}
// Post Google Chat message
func (s *GoogleChat) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
// Header
objName := fmt.Sprintf("%s/%s.%s", strings.ToLower(event.InvolvedObject.Kind), event.InvolvedObject.Name, event.InvolvedObject.Namespace)
header := GoogleChatCardHeader{
Title: objName,
SubTitle: event.ReportingController,
}
sections := make([]GoogleChatCardSection, 0)
// Message
messageText := event.Message
if event.Severity == eventv1.EventSeverityError {
messageText = fmt.Sprintf("<font color=\"#ff0000\">%s</font>", event.Message)
}
sections = append(sections, GoogleChatCardSection{
Widgets: []GoogleChatCardWidget{
{
TextParagraph: &GoogleChatCardWidgetTextParagraph{
Text: messageText,
},
},
},
})
// Meta-Data
if len(event.Metadata) > 0 {
kvfields := make([]GoogleChatCardWidget, 0, len(event.Metadata))
for k, v := range event.Metadata {
kvfields = append(kvfields, GoogleChatCardWidget{
KeyValue: &GoogleChatCardWidgetKeyValue{
TopLabel: k,
Content: v,
ContentMultiLine: false,
},
})
}
sections = append(sections, GoogleChatCardSection{
Widgets: kvfields,
})
}
card := GoogleChatCard{
Header: header,
Sections: sections,
}
payload := GoogleChatPayload{
Cards: []GoogleChatCard{card},
}
var opts []postOption
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
/*
Copyright 2023 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"encoding/json"
"errors"
"fmt"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"sigs.k8s.io/controller-runtime/pkg/log"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
type (
// GooglePubSub holds a Google Pub/Sub client and target topic.
GooglePubSub struct {
topicID string
attrs map[string]string
topicName string
client interface {
publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error)
}
}
googlePubSubClient struct {
projectID string
jsonCreds []byte
}
)
// ensure *GooglePubSub implements Interface.
var _ Interface = &GooglePubSub{}
// NewGooglePubSub creates a Google Pub/Sub client tied to a specific
// project and topic.
//
// The jsonCreds parameter is optional, and if len(jsonCreds) == 0 then the
// automatic authentication methods of the Google libraries will take place,
// and therefore methods like Workload Identity will be automatically attempted.
//
// The attrs paramter is optional, and if len(attrs) == 0 then no attributes will
// be added to the Pub/Sub message.
func NewGooglePubSub(projectID, topicID, jsonCreds string, attrs map[string]string) (*GooglePubSub, error) {
if projectID == "" {
return nil, errors.New("GCP project ID cannot be empty")
}
if topicID == "" {
return nil, errors.New("GCP Pub/Sub topic ID cannot be empty")
}
if len(attrs) == 0 {
attrs = nil
}
return &GooglePubSub{
topicID: topicID,
attrs: attrs,
topicName: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
client: &googlePubSubClient{
projectID: projectID,
jsonCreds: []byte(jsonCreds),
},
}, nil
}
// Post posts Flux events to a Google Pub/Sub topic.
func (g *GooglePubSub) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
eventPayload, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("error json-marshaling event: %w", err)
}
serverID, err := g.client.publish(ctx, g.topicID, eventPayload, g.attrs)
if err != nil {
return fmt.Errorf("error publishing event to topic %s: %w", g.topicName, err)
}
// debug log
log.FromContext(ctx).V(1).Info("Event published to GCP Pub/Sub topic",
"topic", g.topicName,
"server message id", serverID)
return nil
}
func (g *googlePubSubClient) publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error) {
var opts []option.ClientOption
if len(g.jsonCreds) > 0 {
opts = append(opts, option.WithCredentialsJSON(g.jsonCreds))
}
var client *pubsub.Client
client, err = pubsub.NewClient(ctx, g.projectID, opts...)
if err != nil {
return
}
defer func() {
if closeErr := client.Close(); closeErr != nil {
if err != nil {
err = kerrors.NewAggregate([]error{err, closeErr})
} else {
err = closeErr
}
}
}()
serverID, err = client.
Topic(topicID).
Publish(ctx, &pubsub.Message{
Data: eventPayload,
Attributes: attrs,
}).
Get(ctx)
return
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"fmt"
"net/url"
"strings"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/hashicorp/go-retryablehttp"
)
type Grafana struct {
URL string
Token string
ProxyURL string
CertPool *x509.CertPool
Username string
Password string
}
// GraphitePayload represents a Grafana API annotation in Graphite format
type GraphitePayload struct {
When int64 `json:"when"` //optional unix timestamp (ms)
Text string `json:"text"`
Tags []string `json:"tags,omitempty"`
}
// NewGrafana validates the Grafana URL and returns a Grafana object
func NewGrafana(URL string, proxyURL string, token string, certPool *x509.CertPool, username string, password string) (*Grafana, error) {
_, err := url.ParseRequestURI(URL)
if err != nil {
return nil, fmt.Errorf("invalid Grafana URL %s", URL)
}
return &Grafana{
URL: URL,
ProxyURL: proxyURL,
Token: token,
CertPool: certPool,
Username: username,
Password: password,
}, nil
}
// Post annotation
func (g *Grafana) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
sfields := make([]string, 0, len(event.Metadata))
// add tag to filter on grafana
sfields = append(sfields, "flux", event.ReportingController)
for k, v := range event.Metadata {
key := strings.ReplaceAll(k, ":", "|")
value := strings.ReplaceAll(v, ":", "|")
sfields = append(sfields, fmt.Sprintf("%s: %s", key, value))
}
sfields = append(sfields, fmt.Sprintf("kind: %s", event.InvolvedObject.Kind))
sfields = append(sfields, fmt.Sprintf("name: %s", event.InvolvedObject.Name))
sfields = append(sfields, fmt.Sprintf("namespace: %s", event.InvolvedObject.Namespace))
payload := GraphitePayload{
When: event.Timestamp.Unix(),
Text: fmt.Sprintf("%s/%s.%s", strings.ToLower(event.InvolvedObject.Kind), event.InvolvedObject.Name, event.InvolvedObject.Namespace),
Tags: sfields,
}
opts := []postOption{
withRequestModifier(func(req *retryablehttp.Request) {
if (g.Username != "" && g.Password != "") && g.Token == "" {
req.Header.Add("Authorization", "Basic "+basicAuth(g.Username, g.Password))
}
if g.Token != "" {
req.Header.Add("Authorization", "Bearer "+g.Token)
}
}),
}
if g.ProxyURL != "" {
opts = append(opts, withProxy(g.ProxyURL))
}
if g.CertPool != nil {
opts = append(opts, withCertPool(g.CertPool))
}
if err := postMessage(ctx, g.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
package notifier
import (
"context"
"fmt"
"net/url"
"strings"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
type Lark struct {
URL string
}
type LarkPayload struct {
MsgType string `json:"msg_type"`
Card LarkCard `json:"card"`
}
type LarkCard struct {
Config LarkConfig `json:"config"`
Header LarkHeader `json:"header"`
Elements []LarkElement `json:"elements"`
}
type LarkConfig struct {
WideScreenMode bool `json:"wide_screen_mode"`
}
type LarkHeader struct {
Title LarkTitle `json:"title"`
Template string `json:"template"`
}
type LarkTitle struct {
Tag string `json:"tag"`
Content string `json:"content"`
}
type LarkElement struct {
Tag string `json:"tag"`
Text LarkText `json:"text"`
}
type LarkText struct {
Tag string `json:"tag"`
Content string `json:"content"`
}
func NewLark(address string) (*Lark, error) {
_, err := url.ParseRequestURI(address)
if err != nil {
return nil, fmt.Errorf("invalid Slack hook URL %s", address)
}
return &Lark{
URL: address,
}, nil
}
func (l *Lark) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
emoji := "💫"
color := "turquoise"
if event.Severity == eventv1.EventSeverityError {
emoji = "🚨"
color = "red"
}
message := fmt.Sprintf("**%s**\n\n", event.Message)
for k, v := range event.Metadata {
message = message + fmt.Sprintf("%s: %s\n", k, v)
}
element := LarkElement{
Tag: "div",
Text: LarkText{
Tag: "lark_md",
Content: message,
},
}
card := LarkCard{
Config: LarkConfig{
WideScreenMode: true,
},
Header: LarkHeader{
Title: LarkTitle{
Tag: "plain_text",
Content: fmt.Sprintf("%s %s/%s.%s", emoji, strings.ToLower(event.InvolvedObject.Kind),
event.InvolvedObject.Name, event.InvolvedObject.Namespace),
},
Template: color,
},
Elements: []LarkElement{
element,
},
}
payload := LarkPayload{
MsgType: "interactive",
Card: card,
}
return postMessage(ctx, l.URL, payload)
}
package notifier
import (
"context"
"crypto/sha1"
"crypto/x509"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/hashicorp/go-retryablehttp"
)
type Matrix struct {
Token string
URL string
RoomId string
CertPool *x509.CertPool
}
type MatrixPayload struct {
Body string `json:"body"`
MsgType string `json:"msgtype"`
}
func NewMatrix(serverURL, token, roomId string, certPool *x509.CertPool) (*Matrix, error) {
_, err := url.ParseRequestURI(serverURL)
if err != nil {
return nil, fmt.Errorf("invalid Matrix homeserver URL %s: '%w'", serverURL, err)
}
return &Matrix{
URL: serverURL,
RoomId: roomId,
Token: token,
CertPool: certPool,
}, nil
}
func (m *Matrix) Post(ctx context.Context, event eventv1.Event) error {
txId, err := sha1sum(event)
if err != nil {
return fmt.Errorf("unable to generate unique tx id: %s", err)
}
fullURL := fmt.Sprintf("%s/_matrix/client/r0/rooms/%s/send/m.room.message/%s",
m.URL, m.RoomId, txId)
emoji := "💫"
if event.Severity == eventv1.EventSeverityError {
emoji = "🚨"
}
var metadata string
for k, v := range event.Metadata {
metadata = metadata + fmt.Sprintf("- %s: %s\n", k, v)
}
heading := fmt.Sprintf("%s %s/%s.%s", emoji, strings.ToLower(event.InvolvedObject.Kind),
event.InvolvedObject.Name, event.InvolvedObject.Namespace)
msg := fmt.Sprintf("%s\n%s\n%s", heading, event.Message, metadata)
payload := MatrixPayload{
Body: msg,
MsgType: "m.text",
}
opts := []postOption{
withRequestModifier(func(req *retryablehttp.Request) {
req.Method = http.MethodPut
req.Header.Add("Authorization", "Bearer "+m.Token)
}),
}
if m.CertPool != nil {
opts = append(opts, withCertPool(m.CertPool))
}
if err := postMessage(ctx, fullURL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
func sha1sum(event eventv1.Event) (string, error) {
val, err := json.Marshal(event)
if err != nil {
return "", err
}
digest := sha1.Sum(val)
return fmt.Sprintf("%x", digest), nil
}
/*
Copyright 2023 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"encoding/json"
"errors"
"fmt"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/nats-io/nats.go"
"sigs.k8s.io/controller-runtime/pkg/log"
)
type (
// NATS holds a NATS client and target subject.
NATS struct {
subject string
client interface {
publish(ctx context.Context, subject string, eventPayload []byte) (err error)
}
}
natsClient struct {
server string
username string
password string
}
)
func NewNATS(server string, subject string, username string, password string) (*NATS, error) {
if server == "" {
return nil, errors.New("NATS server (address) cannot be empty")
}
if subject == "" {
return nil, errors.New("NATS subject (channel) cannot be empty")
}
return &NATS{
subject: subject,
client: &natsClient{
server: server,
username: username,
password: password,
},
}, nil
}
// Post posts Flux events to a NATS subject.
func (n *NATS) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
eventPayload, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("error json-marshaling event: %w", err)
}
err = n.client.publish(ctx, n.subject, eventPayload)
if err != nil {
return fmt.Errorf("error publishing event to subject %s: %w", n.subject, err)
}
// debug log
log.FromContext(ctx).V(1).Info("Event published to NATS subject", "subject", n.subject)
return nil
}
func (n *natsClient) publish(ctx context.Context, subject string, eventPayload []byte) (err error) {
opts := []nats.Option{nats.Name("NATS Provider Publisher")}
if n.username != "" && n.password != "" {
opts = append(opts, nats.UserInfo(n.username, n.password))
}
nc, err := nats.Connect(n.server, opts...)
if err != nil {
return fmt.Errorf("error connecting to server: %w", err)
}
defer nc.Close()
nc.Publish(subject, eventPayload)
nc.Flush()
if err = nc.LastError(); err != nil {
return fmt.Errorf("error publishing message to server: %w", err)
}
return err
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
type NopNotifier struct{}
func (n *NopNotifier) Post(ctx context.Context, event eventv1.Event) error {
return nil
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"errors"
"fmt"
"net/url"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/hashicorp/go-retryablehttp"
)
type Opsgenie struct {
URL string
ProxyURL string
CertPool *x509.CertPool
ApiKey string
}
type OpsgenieAlert struct {
Message string `json:"message"`
Description string `json:"description"`
Details map[string]string `json:"details"`
}
func NewOpsgenie(hookURL string, proxyURL string, certPool *x509.CertPool, token string) (*Opsgenie, error) {
_, err := url.ParseRequestURI(hookURL)
if err != nil {
return nil, fmt.Errorf("invalid Opsgenie hook URL %s: '%w'", hookURL, err)
}
if token == "" {
return nil, errors.New("empty Opsgenie apikey/token")
}
return &Opsgenie{
URL: hookURL,
ProxyURL: proxyURL,
CertPool: certPool,
ApiKey: token,
}, nil
}
// Post opsgenie alert message
func (s *Opsgenie) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
var details = make(map[string]string)
if event.Metadata != nil {
details = event.Metadata
}
details["severity"] = event.Severity
payload := OpsgenieAlert{
Message: event.InvolvedObject.Kind + "/" + event.InvolvedObject.Name,
Description: event.Message,
Details: details,
}
opts := []postOption{
withRequestModifier(func(req *retryablehttp.Request) {
req.Header.Set("Authorization", "GenieKey "+s.ApiKey)
}),
}
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
/*
Copyright 2023 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"fmt"
"net/url"
"time"
"github.com/PagerDuty/go-pagerduty"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
)
type PagerDuty struct {
Endpoint string
RoutingKey string
ProxyURL string
CertPool *x509.CertPool
}
func NewPagerDuty(endpoint string, proxyURL string, certPool *x509.CertPool, routingKey string) (*PagerDuty, error) {
URL, err := url.ParseRequestURI(endpoint)
if err != nil {
return nil, fmt.Errorf("invalid PagerDuty endpoint URL %q: '%w'", endpoint, err)
}
return &PagerDuty{
Endpoint: URL.Scheme + "://" + URL.Host,
RoutingKey: routingKey,
ProxyURL: proxyURL,
CertPool: certPool,
}, nil
}
func (p *PagerDuty) Post(ctx context.Context, event eventv1.Event) error {
// skip commit status updates and progressing events (we want success or failure)
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) || event.HasReason(meta.ProgressingReason) {
return nil
}
var opts []postOption
if p.ProxyURL != "" {
opts = append(opts, withProxy(p.ProxyURL))
}
if p.CertPool != nil {
opts = append(opts, withCertPool(p.CertPool))
}
if err := postMessage(
ctx,
p.Endpoint+"/v2/enqueue",
toPagerDutyV2Event(event, p.RoutingKey),
opts...,
); err != nil {
return fmt.Errorf("failed sending event: %w", err)
}
// Send a change event for info events
if event.Severity == eventv1.EventSeverityInfo {
if err := postMessage(
ctx,
p.Endpoint+"/v2/change/enqueue",
toPagerDutyChangeEvent(event, p.RoutingKey),
opts...,
); err != nil {
return fmt.Errorf("failed sending change event: %w", err)
}
}
return nil
}
func toPagerDutyV2Event(event eventv1.Event, routingKey string) pagerduty.V2Event {
name, desc := formatNameAndDescription(event)
// Send resolve just in case an existing incident is open
e := pagerduty.V2Event{
RoutingKey: routingKey,
Action: "resolve",
DedupKey: string(event.InvolvedObject.UID),
}
// Trigger an incident for errors
if event.Severity == eventv1.EventSeverityError {
e.Action = "trigger"
e.Payload = &pagerduty.V2Payload{
Summary: desc + ": " + name,
Source: "Flux " + event.ReportingController,
Severity: toPagerDutySeverity(event.Severity),
Timestamp: event.Timestamp.Format(time.RFC3339),
Component: event.InvolvedObject.Name,
Group: event.InvolvedObject.Kind,
Details: map[string]interface{}{
"message": event.Message,
"metadata": event.Metadata,
},
}
}
return e
}
func toPagerDutyChangeEvent(event eventv1.Event, routingKey string) pagerduty.ChangeEvent {
name, desc := formatNameAndDescription(event)
ce := pagerduty.ChangeEvent{
RoutingKey: routingKey,
Payload: pagerduty.ChangeEventPayload{
Summary: desc + ": " + name,
Source: "Flux " + event.ReportingController,
Timestamp: event.Timestamp.Format(time.RFC3339),
CustomDetails: map[string]interface{}{
"message": event.Message,
"metadata": event.Metadata,
},
},
}
return ce
}
func toPagerDutySeverity(severity string) string {
switch severity {
case eventv1.EventSeverityError:
case eventv1.EventSeverityInfo:
return severity
case eventv1.EventSeverityTrace:
return "info"
}
return "error"
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"fmt"
"net/url"
"strings"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
// Rocket holds the hook URL
type Rocket struct {
URL string
ProxyURL string
Username string
Channel string
CertPool *x509.CertPool
}
// NewRocket validates the Rocket URL and returns a Rocket object
func NewRocket(hookURL string, proxyURL string, certPool *x509.CertPool, username string, channel string) (*Rocket, error) {
_, err := url.ParseRequestURI(hookURL)
if err != nil {
return nil, fmt.Errorf("invalid Rocket hook URL %s: '%w'", hookURL, err)
}
return &Rocket{
Channel: channel,
URL: hookURL,
ProxyURL: proxyURL,
Username: username,
CertPool: certPool,
}, nil
}
// Post Rocket message
func (s *Rocket) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
payload := SlackPayload{
Channel: s.Channel,
Username: s.Username,
}
color := "#0076D7"
if event.Severity == eventv1.EventSeverityError {
color = "#FF0000"
}
sfields := make([]SlackField, 0, len(event.Metadata))
for k, v := range event.Metadata {
sfields = append(sfields, SlackField{k, v, false})
}
a := SlackAttachment{
Color: color,
AuthorName: fmt.Sprintf("%s/%s.%s", strings.ToLower(event.InvolvedObject.Kind), event.InvolvedObject.Name, event.InvolvedObject.Namespace),
Text: event.Message,
MrkdwnIn: []string{"text"},
Fields: sfields,
}
payload.Attachments = []SlackAttachment{a}
var opts []postOption
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net/http"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/getsentry/sentry-go"
)
// Sentry holds the client instance
type Sentry struct {
Client *sentry.Client
}
// NewSentry creates a Sentry client from the provided Data Source Name (DSN)
func NewSentry(certPool *x509.CertPool, dsn string, environment string) (*Sentry, error) {
tr := &http.Transport{}
if certPool != nil {
tr = &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
},
}
}
client, err := sentry.NewClient(sentry.ClientOptions{
Dsn: dsn,
Environment: environment,
HTTPTransport: tr,
TracesSampleRate: 1,
})
if err != nil {
return nil, err
}
return &Sentry{
Client: client,
}, nil
}
// Post event to Sentry
func (s *Sentry) Post(ctx context.Context, event eventv1.Event) error {
var sev *sentry.Event
// Send event to Sentry
switch event.Severity {
case eventv1.EventSeverityInfo:
// Info is sent as a trace
sev = eventToSpan(event)
case eventv1.EventSeverityError:
// Errors are sent as normal events
sev = toSentryEvent(event)
}
s.Client.CaptureEvent(sev, nil, nil)
return nil
}
// Convert a controller event to a Sentry trace
// Sentry traces work slightly different compared to normal events, they don't cause
// alerts by default and are saved differently.
// They are shown in a dashobard with graphs, so they can be used to check if and how often
// flux tasks are running
func eventToSpan(event eventv1.Event) *sentry.Event {
obj := event.InvolvedObject
// Sadly you can't create spans on specific clients, they are always auto-generated
// from the context, and the client saved within
span := sentry.StartSpan(context.Background(), "event")
// TODO: Maybe change the tag names?
span.SetTag("flux_involved_object_kind", obj.Kind)
span.SetTag("flux_involved_object_namespace", obj.Namespace)
span.SetTag("flux_involved_object_name", obj.Name)
span.SetTag("flux_reporting_controller", event.ReportingController)
span.SetTag("flux_reporting_instance", event.ReportingInstance)
span.SetTag("flux_reason", event.Reason)
span.StartTime = event.Timestamp.Time
span.EndTime = event.Timestamp.Time
for k, v := range event.Metadata {
span.SetTag(k, v)
}
// So because the sentry-go sdk has no way to send transactions
// with an explicit client, we have to do it ourselves
return &sentry.Event{
Type: "transaction",
Transaction: eventSummary(event),
Message: event.Message,
Contexts: map[string]sentry.Context{
"trace": sentry.TraceContext{
TraceID: span.TraceID,
SpanID: span.SpanID,
ParentSpanID: span.ParentSpanID,
Op: span.Op,
Description: span.Description,
Status: span.Status,
}.Map(),
},
Tags: span.Tags,
Extra: span.Data,
Timestamp: span.EndTime,
StartTime: span.StartTime,
Spans: []*sentry.Span{span},
}
}
func eventSummary(event eventv1.Event) string {
obj := event.InvolvedObject
return fmt.Sprintf("%s: %s/%s", obj.Kind, obj.Namespace, obj.Name)
}
// Maps a controller-issued event to a Sentry event
func toSentryEvent(event eventv1.Event) *sentry.Event {
// Prepare Metadata
extra := make(map[string]interface{}, len(event.Metadata))
for k, v := range event.Metadata {
extra[k] = v
}
// Construct event
return &sentry.Event{
Timestamp: event.Timestamp.Time,
Level: sentry.Level(event.Severity),
ServerName: event.ReportingController,
Transaction: eventSummary(event),
Extra: extra,
Message: event.Message,
}
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"encoding/json"
"fmt"
"net/url"
"strings"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/hashicorp/go-retryablehttp"
)
// Slack holds the hook URL
type Slack struct {
URL string
ProxyURL string
Token string
Username string
Channel string
CertPool *x509.CertPool
}
// SlackPayload holds the channel and attachments
type SlackPayload struct {
Channel string `json:"channel"`
Username string `json:"username"`
IconUrl string `json:"icon_url"`
IconEmoji string `json:"icon_emoji"`
Text string `json:"text,omitempty"`
Attachments []SlackAttachment `json:"attachments,omitempty"`
}
// SlackAttachment holds the markdown message body
type SlackAttachment struct {
Color string `json:"color"`
AuthorName string `json:"author_name"`
Text string `json:"text"`
MrkdwnIn []string `json:"mrkdwn_in"`
Fields []SlackField `json:"fields"`
}
type SlackField struct {
Title string `json:"title"`
Value string `json:"value"`
Short bool `json:"short"`
}
// NewSlack validates the Slack URL and returns a Slack object
func NewSlack(hookURL string, proxyURL string, token string, certPool *x509.CertPool, username string, channel string) (*Slack, error) {
_, err := url.ParseRequestURI(hookURL)
if err != nil {
return nil, fmt.Errorf("invalid Slack hook URL %s: '%w'", hookURL, err)
}
return &Slack{
Channel: channel,
Username: username,
URL: hookURL,
ProxyURL: proxyURL,
Token: token,
CertPool: certPool,
}, nil
}
// Post Slack message
func (s *Slack) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
payload := SlackPayload{
Username: s.Username,
}
if s.Channel != "" {
payload.Channel = s.Channel
}
if payload.Username == "" {
payload.Username = event.ReportingController
}
color := "good"
if event.Severity == eventv1.EventSeverityError {
color = "danger"
}
sfields := make([]SlackField, 0, len(event.Metadata))
for k, v := range event.Metadata {
sfields = append(sfields, SlackField{k, v, false})
}
a := SlackAttachment{
Color: color,
AuthorName: fmt.Sprintf("%s/%s.%s", strings.ToLower(event.InvolvedObject.Kind), event.InvolvedObject.Name, event.InvolvedObject.Namespace),
Text: event.Message,
MrkdwnIn: []string{"text"},
Fields: sfields,
}
payload.Attachments = []SlackAttachment{a}
opts := []postOption{
withRequestModifier(func(request *retryablehttp.Request) {
if s.Token != "" {
request.Header.Add("Authorization", "Bearer "+s.Token)
}
}),
}
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if s.URL == "https://slack.com/api/chat.postMessage" {
opts = append(opts, withResponseValidator(validateSlackResponse))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
// validateSlackResponse validates that a chat.postMessage API response is successful.
// chat.postMessage API always returns 200 OK.
// See https://api.slack.com/methods/chat.postMessage.
//
// On the other hand, incoming webhooks return more expressive HTTP status codes.
// See https://api.slack.com/messaging/webhooks#handling_errors.
func validateSlackResponse(_ int, body []byte) error {
type slackResponse struct {
Ok bool `json:"ok"`
Error string `json:"error"`
}
slackResp := slackResponse{}
if err := json.Unmarshal(body, &slackResp); err != nil {
return fmt.Errorf("unable to unmarshal response body: %w", err)
}
if slackResp.Ok {
return nil
}
return fmt.Errorf("Slack responded with error: %s", slackResp.Error)
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"fmt"
"net/url"
"slices"
"strings"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
const (
msTeamsSchemaDeprecatedConnector = iota
msTeamsSchemaAdaptiveCard
// msAdaptiveCardVersion is the version of the MS Adaptive Card schema.
// MS Teams currently supports only up to version 1.4:
// https://community.powerplatform.com/forums/thread/details/?threadid=edde0a5d-e995-4ba3-96dc-2120fe51a4d0
msAdaptiveCardVersion = "1.4"
)
// MS Teams holds the incoming webhook URL
type MSTeams struct {
URL string
ProxyURL string
CertPool *x509.CertPool
Schema int
}
// MSTeamsPayload holds the message card data
type MSTeamsPayload struct {
Type string `json:"@type"`
Context string `json:"@context"`
ThemeColor string `json:"themeColor"`
Summary string `json:"summary"`
Sections []MSTeamsSection `json:"sections"`
}
// MSTeamsSection holds the canary analysis result
type MSTeamsSection struct {
ActivityTitle string `json:"activityTitle"`
ActivitySubtitle string `json:"activitySubtitle"`
Facts []MSTeamsField `json:"facts"`
}
type MSTeamsField struct {
Name string `json:"name"`
Value string `json:"value"`
}
// The Adaptice Card payload structures below reflect this documentation:
// https://learn.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/connectors-using?tabs=cURL%2Ctext1#send-adaptive-cards-using-an-incoming-webhook
type msAdaptiveCardMessage struct {
Type string `json:"type"`
Attachments []msAdaptiveCardAttachment `json:"attachments"`
}
type msAdaptiveCardAttachment struct {
ContentType string `json:"contentType"`
Content msAdaptiveCardContent `json:"content"`
}
type msAdaptiveCardContent struct {
Schema string `json:"$schema"`
Type string `json:"type"`
Version string `json:"version"`
Body []msAdaptiveCardBodyElement `json:"body"`
MSTeams msAdaptiveCardMSTeams `json:"msteams"`
}
type msAdaptiveCardBodyElement struct {
Type string `json:"type"`
*msAdaptiveCardContainer `json:",inline"`
*msAdaptiveCardTextBlock `json:",inline"`
*msAdaptiveCardFactSet `json:",inline"`
}
type msAdaptiveCardContainer struct {
Items []msAdaptiveCardBodyElement `json:"items,omitempty"`
}
type msAdaptiveCardMSTeams struct {
Width string `json:"width,omitempty"`
}
type msAdaptiveCardTextBlock struct {
Text string `json:"text,omitempty"`
Size string `json:"size,omitempty"`
Weight string `json:"weight,omitempty"`
Color string `json:"color,omitempty"`
Wrap bool `json:"wrap,omitempty"`
}
type msAdaptiveCardFactSet struct {
Facts []msAdaptiveCardFact `json:"facts,omitempty"`
}
type msAdaptiveCardFact struct {
Title string `json:"title"`
Value string `json:"value"`
}
// NewMSTeams validates the MS Teams URL and returns a MSTeams object
func NewMSTeams(hookURL string, proxyURL string, certPool *x509.CertPool) (*MSTeams, error) {
u, err := url.ParseRequestURI(hookURL)
if err != nil {
return nil, fmt.Errorf("invalid MS Teams webhook URL %s: '%w'", hookURL, err)
}
provider := &MSTeams{
URL: hookURL,
ProxyURL: proxyURL,
CertPool: certPool,
Schema: msTeamsSchemaAdaptiveCard,
}
// Check if the webhook URL is the deprecated connector and update the schema accordingly.
if strings.HasSuffix(strings.Split(u.Host, ":")[0], ".webhook.office.com") {
provider.Schema = msTeamsSchemaDeprecatedConnector
}
return provider, nil
}
// Post MS Teams message
func (s *MSTeams) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
objName := fmt.Sprintf("%s/%s.%s", strings.ToLower(event.InvolvedObject.Kind), event.InvolvedObject.Name, event.InvolvedObject.Namespace)
var payload any
switch s.Schema {
case msTeamsSchemaDeprecatedConnector:
payload = buildMSTeamsDeprecatedConnectorPayload(&event, objName)
case msTeamsSchemaAdaptiveCard:
payload = buildMSTeamsAdaptiveCardPayload(&event, objName)
default:
payload = buildMSTeamsAdaptiveCardPayload(&event, objName)
}
var opts []postOption
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
func buildMSTeamsDeprecatedConnectorPayload(event *eventv1.Event, objName string) *MSTeamsPayload {
facts := make([]MSTeamsField, 0, len(event.Metadata))
for k, v := range event.Metadata {
facts = append(facts, MSTeamsField{
Name: k,
Value: v,
})
}
payload := &MSTeamsPayload{
Type: "MessageCard",
Context: "http://schema.org/extensions",
ThemeColor: "0076D7",
Summary: objName,
Sections: []MSTeamsSection{
{
ActivityTitle: event.Message,
ActivitySubtitle: objName,
Facts: facts,
},
},
}
if event.Severity == eventv1.EventSeverityError {
payload.ThemeColor = "FF0000"
}
return payload
}
func buildMSTeamsAdaptiveCardPayload(event *eventv1.Event, objName string) *msAdaptiveCardMessage {
// Prepare message, add red color to error messages.
message := &msAdaptiveCardTextBlock{
Text: event.Message,
Wrap: true,
}
if event.Severity == eventv1.EventSeverityError {
message.Color = "attention"
}
// Put "summary" first, then sort the rest of the metadata by key.
facts := make([]msAdaptiveCardFact, 0, len(event.Metadata))
const summaryKey = "summary"
if summary, ok := event.Metadata[summaryKey]; ok {
facts = append(facts, msAdaptiveCardFact{
Title: summaryKey,
Value: summary,
})
}
metadataFirstIndex := len(facts)
for k, v := range event.Metadata {
if k == summaryKey {
continue
}
facts = append(facts, msAdaptiveCardFact{
Title: k,
Value: v,
})
}
slices.SortFunc(facts[metadataFirstIndex:], func(a, b msAdaptiveCardFact) int {
return strings.Compare(a.Title, b.Title)
})
// The card below was built with help from https://adaptivecards.io/designer using the Microsoft Teams host app.
payload := &msAdaptiveCardMessage{
Type: "message",
Attachments: []msAdaptiveCardAttachment{
{
ContentType: "application/vnd.microsoft.card.adaptive",
Content: msAdaptiveCardContent{
Schema: "http://adaptivecards.io/schemas/adaptive-card.json",
Type: "AdaptiveCard",
Version: msAdaptiveCardVersion,
MSTeams: msAdaptiveCardMSTeams{
Width: "Full",
},
Body: []msAdaptiveCardBodyElement{
{
Type: "Container",
msAdaptiveCardContainer: &msAdaptiveCardContainer{
Items: []msAdaptiveCardBodyElement{
{
Type: "TextBlock",
msAdaptiveCardTextBlock: &msAdaptiveCardTextBlock{
Text: objName,
Size: "large",
Weight: "bolder",
Wrap: true,
},
},
{
Type: "TextBlock",
msAdaptiveCardTextBlock: message,
},
{
Type: "FactSet",
msAdaptiveCardFactSet: &msAdaptiveCardFactSet{
Facts: facts,
},
},
},
},
},
},
},
},
},
}
return payload
}
package notifier
import (
"context"
"errors"
"fmt"
"strings"
"github.com/containrrr/shoutrrr"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
type Telegram struct {
Channel string
Token string
send func(url string, message string) error // this allows the send function to be overridden for testing
}
func NewTelegram(channel, token string) (*Telegram, error) {
if channel == "" {
return nil, errors.New("empty Telegram channel")
}
return &Telegram{
Channel: channel,
Token: token,
send: shoutrrr.Send,
}, nil
}
func (t *Telegram) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
emoji := "💫"
if event.Severity == eventv1.EventSeverityError {
emoji = "🚨"
}
heading := fmt.Sprintf("%s %s/%s/%s", emoji, strings.ToLower(event.InvolvedObject.Kind),
event.InvolvedObject.Name, event.InvolvedObject.Namespace)
var metadata string
for k, v := range event.Metadata {
metadata = metadata + fmt.Sprintf("\\- *%s*: %s\n", escapeString(k), escapeString(v))
}
message := fmt.Sprintf("*%s*\n%s\n%s", escapeString(heading), escapeString(event.Message), metadata)
url := fmt.Sprintf("telegram://%s@telegram?channels=%s&parseMode=markDownv2", t.Token, t.Channel)
err := t.send(url, message)
return err
}
// The telegram API requires that some special characters are escaped
// in the message string. Docs: https://core.telegram.org/bots/api#formatting-options.
func escapeString(str string) string {
chars := "\\.-_[]()~>`#+=|{}!*"
for _, char := range chars {
start := 0
idx := 0
for start < len(str) && idx != -1 {
idx = strings.IndexRune(str[start:], char)
if idx != -1 {
newIdx := start + idx
str = str[:newIdx] + `\` + str[newIdx:]
start = newIdx + 2
}
}
}
return str
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"crypto/sha1"
"encoding/base64"
"fmt"
"strings"
"unicode"
"unicode/utf8"
"github.com/fluxcd/pkg/git"
giturls "github.com/chainguard-dev/git-urls"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
func parseGitAddress(s string) (string, string, error) {
u, err := giturls.Parse(s)
if err != nil {
return "", "", fmt.Errorf("failed parsing URL %q: %w", s, err)
}
scheme := u.Scheme
if u.Scheme == "ssh" {
scheme = "https"
}
id := strings.TrimLeft(u.Path, "/")
id = strings.TrimSuffix(id, ".git")
host := fmt.Sprintf("%s://%s", scheme, u.Host)
return host, id, nil
}
func formatNameAndDescription(event eventv1.Event) (string, string) {
name := fmt.Sprintf("%v/%v", event.InvolvedObject.Kind, event.InvolvedObject.Name)
name = strings.ToLower(name)
desc := strings.Join(splitCamelcase(event.Reason), " ")
desc = strings.ToLower(desc)
return name, desc
}
func splitCamelcase(src string) (entries []string) {
// don't split invalid utf8
if !utf8.ValidString(src) {
return []string{src}
}
entries = []string{}
var runes [][]rune
lastClass := 0
class := 0
// split into fields based on class of unicode character
for _, r := range src {
switch true {
case unicode.IsLower(r):
class = 1
case unicode.IsUpper(r):
class = 2
case unicode.IsDigit(r):
class = 3
default:
class = 4
}
if class == lastClass {
runes[len(runes)-1] = append(runes[len(runes)-1], r)
} else {
runes = append(runes, []rune{r})
}
lastClass = class
}
// handle upper case -> lower case sequences, e.g.
// "PDFL", "oader" -> "PDF", "Loader"
for i := 0; i < len(runes)-1; i++ {
if unicode.IsUpper(runes[i][0]) && unicode.IsLower(runes[i+1][0]) {
runes[i+1] = append([]rune{runes[i][len(runes[i])-1]}, runes[i+1]...)
runes[i] = runes[i][:len(runes[i])-1]
}
}
// construct []string from results
for _, s := range runes {
if len(s) > 0 {
entries = append(entries, string(s))
}
}
return
}
func parseRevision(rev string) (string, error) {
hash := git.ExtractHashFromRevision(git.TransformRevision(rev))
if hash.Algorithm() == git.HashTypeUnknown {
return "", fmt.Errorf("failed to extract commit hash from '%s' revision", rev)
}
return hash.String(), nil
}
func sha1String(str string) string {
bs := []byte(str)
return fmt.Sprintf("%x", sha1.Sum(bs))
}
func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
func strPtr(s string) *string {
return &s
}
func int64Ptr(i int64) *int64 {
return &i
}
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"crypto/x509"
"fmt"
"net/url"
"strings"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/hashicorp/go-retryablehttp"
)
//
// General steps on how to hook up Flux notifications to a Webex space:
// From the Webex App UI:
// - create a Webex space where you want notifications to be sent
// - add the bot email address to the Webex space (see next section)
//
// Register to https://developer.webex.com/, after signing in:
// - create a bot for forwarding FluxCD notifications to a Webex Space (User profile icon|MyWebexApps|Create a New App|Create a Bot)
// - make a note of the bot email address, this email needs to be added to the Webex space
// - generate a bot access token, this is the ID to use in the webex provider manifest token field
// - find the room ID associated to the webex space using https://developer.webex.com/docs/api/v1/rooms/list-rooms
// - this is the ID to use in the webex provider manifest channel field
//
// Webex holds the hook URL
type Webex struct {
// mandatory: this should be set to the universal webex API server https://webexapis.com/v1/messages
URL string
// mandatory: webex room ID, specifies on which webex space notifications must be sent
RoomId string
// mandatory: webex bot access token, this access token must be generated after creating a webex bot
Token string
// optional: use a proxy as needed
ProxyURL string
// optional: x509 cert is no longer needed to post to a webex space
CertPool *x509.CertPool
}
// WebexPayload holds the message text
type WebexPayload struct {
RoomId string `json:"roomId,omitempty"`
Markdown string `json:"markdown,omitempty"`
}
// NewWebex validates the Webex URL and returns a Webex object
func NewWebex(hookURL, proxyURL string, certPool *x509.CertPool, channel string, token string) (*Webex, error) {
_, err := url.ParseRequestURI(hookURL)
if err != nil {
return nil, fmt.Errorf("invalid Webex hook URL %s: '%w'", hookURL, err)
}
return &Webex{
URL: hookURL,
ProxyURL: proxyURL,
CertPool: certPool,
RoomId: channel,
Token: token,
}, nil
}
func (s *Webex) CreateMarkdown(event *eventv1.Event) string {
var b strings.Builder
emoji := "✅"
if event.Severity == eventv1.EventSeverityError {
emoji = "💣"
}
fmt.Fprintf(&b, "%s **%s/%s.%s**\n", emoji, strings.ToLower(event.InvolvedObject.Kind), event.InvolvedObject.Name, event.InvolvedObject.Namespace)
fmt.Fprintf(&b, "%s\n", event.Message)
if len(event.Metadata) > 0 {
for k, v := range event.Metadata {
fmt.Fprintf(&b, ">**%s**: %s\n", k, v)
}
}
return b.String()
}
// Post Webex message
func (s *Webex) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}
payload := WebexPayload{
RoomId: s.RoomId,
Markdown: s.CreateMarkdown(&event),
}
opts := []postOption{
withRequestModifier(func(req *retryablehttp.Request) {
req.Header.Add("Authorization", "Bearer "+s.Token)
}),
}
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This file is modified from the source at
https://github.com/kubernetes-sigs/cluster-api/tree/7478817225e0a75acb6e14fc7b438231578073d2/util/conditions/getter.go,
and initially adapted to work with the `metav1.Condition` and `metav1.ConditionStatus` types.
More concretely, this includes the removal of "condition severity" related functionalities, as this is not supported by
the `metav1.Condition` type.
*/
package conditions
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/fluxcd/pkg/apis/meta"
)
// Getter interface defines methods that a Kubernetes resource object should implement in order to use the conditions
// package for getting conditions.
type Getter interface {
client.Object
meta.ObjectWithConditions
}
// Get returns the condition with the given type, if the condition does not exists, it returns nil.
func Get(from Getter, t string) *metav1.Condition {
conditions := from.GetConditions()
if conditions == nil {
return nil
}
for _, condition := range conditions {
if condition.Type == t {
return &condition
}
}
return nil
}
// Has returns true if a condition with the given type exists.
func Has(from Getter, t string) bool {
return Get(from, t) != nil
}
// HasAny returns true if a condition with any of the given types exist.
func HasAny(from Getter, t []string) bool {
for _, ct := range t {
if Has(from, ct) {
return true
}
}
return false
}
// HasAnyReason returns true if a condition with the given
// type exists and any of the given reasons exist.
func HasAnyReason(from Getter, t string, r ...string) bool {
for _, reason := range r {
if GetReason(from, t) == reason {
return true
}
}
return false
}
// IsTrue is true if the condition with the given type is True, otherwise it is false if the condition is not True or if
// the condition does not exist (is nil).
func IsTrue(from Getter, t string) bool {
if c := Get(from, t); c != nil {
return c.Status == metav1.ConditionTrue
}
return false
}
// IsFalse is true if the condition with the given type is False, otherwise it is false if the condition is not False or
// if the condition does not exist (is nil).
func IsFalse(from Getter, t string) bool {
if c := Get(from, t); c != nil {
return c.Status == metav1.ConditionFalse
}
return false
}
// IsUnknown is true if the condition with the given type is Unknown or if the condition does not exist (is nil).
func IsUnknown(from Getter, t string) bool {
if c := Get(from, t); c != nil {
return c.Status == metav1.ConditionUnknown
}
return true
}
// IsReady is true if IsStalled and IsReconciling are False, and meta.ReadyCondition is True, otherwise it is false if
// the condition is not True or if it does not exist (is nil).
func IsReady(from Getter) bool {
return !IsStalled(from) && !IsReconciling(from) && IsTrue(from, meta.ReadyCondition)
}
// IsStalled is true if meta.StalledCondition is True and meta.ReconcilingCondition is False or does not exist,
// otherwise it is false.
func IsStalled(from Getter) bool {
return !IsTrue(from, meta.ReconcilingCondition) && IsTrue(from, meta.StalledCondition)
}
// IsReconciling is true if meta.ReconcilingCondition is True and meta.StalledCondition is False or does not exist,
// otherwise it is false.
func IsReconciling(from Getter) bool {
return !IsTrue(from, meta.StalledCondition) && IsTrue(from, meta.ReconcilingCondition)
}
// GetReason returns a nil safe string of Reason for the condition with the given type.
func GetReason(from Getter, t string) string {
if c := Get(from, t); c != nil {
return c.Reason
}
return ""
}
// GetMessage returns a nil safe string of Message for the condition with the given type.
func GetMessage(from Getter, t string) string {
if c := Get(from, t); c != nil {
return c.Message
}
return ""
}
// GetLastTransitionTime returns the LastTransitionType or nil if the condition does not exist (is nil).
func GetLastTransitionTime(from Getter, t string) *metav1.Time {
if c := Get(from, t); c != nil {
return &c.LastTransitionTime
}
return nil
}
// GetObservedGeneration returns a nil safe int64 of ObservedGeneration for the condition with the given type.
func GetObservedGeneration(from Getter, t string) int64 {
if c := Get(from, t); c != nil {
return c.ObservedGeneration
}
return 0
}
// summary returns a condition with the summary of all the conditions existing on an object. If the object does not have
// other conditions, no summary condition is generated.
func summary(from Getter, t string, options ...MergeOption) *metav1.Condition {
conditions := from.GetConditions()
mergeOpt := &mergeOptions{}
for _, o := range options {
o(mergeOpt)
}
// Identifies the conditions in scope for the Summary by taking all the existing conditions except t,
// or, if a list of conditions types is specified, only the conditions the condition in that list.
conditionsInScope := make([]localizedCondition, 0, len(conditions))
for i := range conditions {
c := conditions[i]
if c.Type == t {
continue
}
if mergeOpt.conditionTypes != nil {
found := false
for _, tt := range mergeOpt.conditionTypes {
if c.Type == tt {
found = true
break
}
}
if !found {
continue
}
}
conditionsInScope = append(conditionsInScope, localizedCondition{
Condition: &c,
Getter: from,
})
}
// If it is required to add a step counter only if a subset of condition exists, check if the conditions
// in scope are included in this subset or not.
if mergeOpt.addStepCounterIfOnlyConditionTypes != nil {
for _, c := range conditionsInScope {
found := false
for _, tt := range mergeOpt.addStepCounterIfOnlyConditionTypes {
if c.Type == tt {
found = true
break
}
}
if !found {
mergeOpt.addStepCounter = false
break
}
}
}
// If it is required to add a step counter, determine the total number of conditions defaulting
// to the selected conditions or, if defined, to the total number of conditions type to be considered.
if mergeOpt.addStepCounter {
mergeOpt.stepCounter = len(conditionsInScope)
if mergeOpt.conditionTypes != nil {
mergeOpt.stepCounter = len(mergeOpt.conditionTypes)
}
if mergeOpt.addStepCounterIfOnlyConditionTypes != nil {
mergeOpt.stepCounter = len(mergeOpt.addStepCounterIfOnlyConditionTypes)
}
}
return merge(conditionsInScope, t, mergeOpt)
}
// mirrorOptions allows to set options for the mirror operation.
type mirrorOptions struct {
fallbackTo *bool
fallbackReason string
fallbackMessage string
}
// MirrorOptions defines an option for mirroring conditions.
type MirrorOptions func(*mirrorOptions)
// WithFallbackValue specify a fallback value to use in case the mirrored condition does not exists; in case the
// fallbackValue is false, given values for reason and message will be used.
func WithFallbackValue(fallbackValue bool, reason string, message string) MirrorOptions {
return func(c *mirrorOptions) {
c.fallbackTo = &fallbackValue
c.fallbackReason = reason
c.fallbackMessage = message
}
}
// mirror mirrors the Ready condition from a dependent object into the target condition; if the Ready condition does not
// exists in the source object, no target conditions is generated.
func mirror(from Getter, targetCondition string, options ...MirrorOptions) *metav1.Condition {
mirrorOpt := &mirrorOptions{}
for _, o := range options {
o(mirrorOpt)
}
condition := Get(from, meta.ReadyCondition)
if mirrorOpt.fallbackTo != nil && condition == nil {
switch *mirrorOpt.fallbackTo {
case true:
condition = TrueCondition(targetCondition, mirrorOpt.fallbackReason, "%s", mirrorOpt.fallbackMessage)
case false:
condition = FalseCondition(targetCondition, mirrorOpt.fallbackReason, "%s", mirrorOpt.fallbackMessage)
}
}
if condition != nil {
condition.Type = targetCondition
}
return condition
}
// aggregate the conditions from a list of depending objects into the target object; the condition scope can be set
// using WithConditions; if none of the source objects have the conditions within the scope, no target condition is
// generated.
func aggregate(from []Getter, targetCondition string, options ...MergeOption) *metav1.Condition {
mergeOpt := &mergeOptions{
stepCounter: len(from),
}
for _, o := range options {
o(mergeOpt)
}
conditionsInScope := make([]localizedCondition, 0, len(from))
for i := range from {
conditions := from[i].GetConditions()
for i, _ := range conditions {
c := conditions[i]
if mergeOpt.conditionTypes != nil {
found := false
for _, tt := range mergeOpt.conditionTypes {
if c.Type == tt {
found = true
break
}
}
if !found {
continue
}
}
conditionsInScope = append(conditionsInScope, localizedCondition{
Condition: &c,
Getter: from[i],
})
}
}
// If it is required to add a counter only if a subset of condition exists, check if the conditions
// in scope are included in this subset or not.
if mergeOpt.addCounterOnlyIfConditionTypes != nil {
for _, c := range conditionsInScope {
found := false
for _, tt := range mergeOpt.addCounterOnlyIfConditionTypes {
if c.Type == tt {
found = true
break
}
}
if !found {
mergeOpt.addCounter = false
break
}
}
}
// If it is required to add a source ref only if a condition type exists, check if the conditions
// in scope are included in this subset or not.
if mergeOpt.addSourceRefIfConditionTypes != nil {
for _, c := range conditionsInScope {
found := false
for _, tt := range mergeOpt.addSourceRefIfConditionTypes {
if c.Type == tt {
found = true
break
}
}
if found {
mergeOpt.addSourceRef = true
break
}
}
}
return merge(conditionsInScope, targetCondition, mergeOpt)
}
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This file is modified from the source at
https://github.com/kubernetes-sigs/cluster-api/tree/7478817225e0a75acb6e14fc7b438231578073d2/util/conditions/matcher.go,
and initially adapted to work with the `metav1.Condition` and `metav1.ConditionStatus` types.
More concretely, this includes the removal of "condition severity" related functionalities, as this is not supported by
the `metav1.Condition` type.
*/
package conditions
import (
"fmt"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// MatchConditions returns a custom matcher to check equality of a metav1.Condition slice, the condition messages are
// checked for a subset string match.
func MatchConditions(expected []metav1.Condition) types.GomegaMatcher {
return &matchConditions{
expected: expected,
}
}
type matchConditions struct {
expected []metav1.Condition
}
func (m matchConditions) Match(actual interface{}) (success bool, err error) {
elems := []interface{}{}
for _, condition := range m.expected {
elems = append(elems, MatchCondition(condition))
}
return ConsistOf(elems).Match(actual)
}
func (m matchConditions) FailureMessage(actual interface{}) (message string) {
return fmt.Sprintf("expected\n\t%#v\nto match\n\t%#v\n", actual, m.expected)
}
func (m matchConditions) NegatedFailureMessage(actual interface{}) (message string) {
return fmt.Sprintf("expected\n\t%#v\nto not match\n\t%#v\n", actual, m.expected)
}
// MatchCondition returns a custom matcher to check equality of metav1.Condition.
func MatchCondition(expected metav1.Condition) types.GomegaMatcher {
return &matchCondition{
expected: expected,
}
}
type matchCondition struct {
expected metav1.Condition
}
func (m matchCondition) Match(actual interface{}) (success bool, err error) {
actualCondition, ok := actual.(metav1.Condition)
if !ok {
return false, fmt.Errorf("actual should be of type Condition")
}
ok, err = Equal(m.expected.Type).Match(actualCondition.Type)
if !ok {
return ok, err
}
ok, err = Equal(m.expected.Status).Match(actualCondition.Status)
if !ok {
return ok, err
}
ok, err = Equal(m.expected.Reason).Match(actualCondition.Reason)
if !ok {
return ok, err
}
ok, err = ContainSubstring(m.expected.Message).Match(actualCondition.Message)
if !ok {
return ok, err
}
return ok, err
}
func (m matchCondition) FailureMessage(actual interface{}) (message string) {
return fmt.Sprintf("expected\n\t%#v\nto match\n\t%#v\n", actual, m.expected)
}
func (m matchCondition) NegatedFailureMessage(actual interface{}) (message string) {
return fmt.Sprintf("expected\n\t%#v\nto not match\n\t%#v\n", actual, m.expected)
}
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This file is modified from the source at
https://github.com/kubernetes-sigs/cluster-api/tree/7478817225e0a75acb6e14fc7b438231578073d2/util/conditions/matchers.go,
and initially adapted to work with the `metav1.Condition` and `metav1.ConditionStatus` types.
More concretely, this includes the removal of "condition severity" related functionalities, as this is not supported by
the `metav1.Condition` type.
*/
package conditions
import (
"errors"
"github.com/onsi/gomega/format"
"github.com/onsi/gomega/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// HaveSameStateOf returns a custom matcher to check equality of a metav1.Condition, the condition message is checked
// for a subset string match.
func HaveSameStateOf(expected *metav1.Condition) types.GomegaMatcher {
return &ConditionMatcher{
Expected: expected,
}
}
type ConditionMatcher struct {
Expected *metav1.Condition
}
func (matcher *ConditionMatcher) Match(actual interface{}) (success bool, err error) {
actualCondition, ok := actual.(*metav1.Condition)
if !ok {
return false, errors.New("value should be a condition")
}
return hasSameState(actualCondition, matcher.Expected), nil
}
func (matcher *ConditionMatcher) FailureMessage(actual interface{}) (message string) {
return format.Message(actual, "to have the same state of", matcher.Expected)
}
func (matcher *ConditionMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return format.Message(actual, "not to have the same state of", matcher.Expected)
}
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This file is modified from the source at
https://github.com/kubernetes-sigs/cluster-api/tree/7478817225e0a75acb6e14fc7b438231578073d2/util/conditions/merge.go,
and initially adapted to work with the `metav1.Condition` and `metav1.ConditionStatus` types.
More concretely, this includes the removal of "condition severity" related functionalities, as this is not supported by
the `metav1.Condition` type.
*/
package conditions
import (
"sort"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// localizedCondition defines a condition with the information of the object the conditions was originated from.
type localizedCondition struct {
*metav1.Condition
Getter
}
// merge a list of condition into a single one.
// This operation is designed to ensure visibility of the most relevant conditions for defining the operational state of
// a component. E.g. If there is one error in the condition list, this one takes priority over the other conditions and
// it is should be reflected in the target condition.
//
// More specifically:
// 1. Conditions are grouped by status, polarity and observed generation (optional).
// 2. The resulting condition groups are sorted according to the following priority:
// - P0 - Status=True, NegativePolarity=True
// - P1 - Status=False, NegativePolarity=False
// - P2 - Status=True, NegativePolarity=False
// - P3 - Status=False, NegativePolarity=True
// - P4 - Status=Unknown
//
// 3. The group with highest priority is used to determine status, and other info of the target condition.
// 4. If the polarity of the highest priority and target priority differ, it is inverted.
// 5. If the observed generation is considered, the condition groups with the latest generation get the highest
// priority.
//
// Please note that the last operation includes also the task of computing the Reason and the Message for the target
// condition; in order to complete such task some trade-off should be made, because there is no a golden rule for
// summarizing many Reason/Message into single Reason/Message. mergeOptions allows the user to adapt this process to the
// specific needs by exposing a set of merge strategies.
func merge(conditions []localizedCondition, targetCondition string, options *mergeOptions) *metav1.Condition {
g := getConditionGroups(conditions, options)
if len(g) == 0 {
return nil
}
topGroup := g.TopGroup()
targetReason := getReason(g, options)
targetMessage := getMessage(g, options)
targetNegativePolarity := stringInSlice(options.negativePolarityConditionTypes, targetCondition)
switch topGroup.status {
case metav1.ConditionTrue:
// Inverse the negative polarity if the target condition has positive polarity.
if topGroup.negativePolarity != targetNegativePolarity {
return FalseCondition(targetCondition, targetReason, "%s", targetMessage)
}
return TrueCondition(targetCondition, targetReason, "%s", targetMessage)
case metav1.ConditionFalse:
// Inverse the negative polarity if the target condition has positive polarity.
if topGroup.negativePolarity != targetNegativePolarity {
return TrueCondition(targetCondition, targetReason, "%s", targetMessage)
}
return FalseCondition(targetCondition, targetReason, "%s", targetMessage)
default:
return UnknownCondition(targetCondition, targetReason, "%s", targetMessage)
}
}
// getConditionGroups groups a list of conditions according to status values and polarity.
// Additionally, the resulting groups are sorted by mergePriority.
func getConditionGroups(conditions []localizedCondition, options *mergeOptions) conditionGroups {
groups := conditionGroups{}
for _, condition := range conditions {
if condition.Condition == nil {
continue
}
added := false
for i := range groups {
if groups[i].status == condition.Status &&
groups[i].negativePolarity == stringInSlice(options.negativePolarityConditionTypes, condition.Type) {
// If withLatestGeneration is true, add to group only if the generation match.
if options.withLatestGeneration && groups[i].generation != condition.ObservedGeneration {
continue
}
groups[i].conditions = append(groups[i].conditions, condition)
added = true
break
}
}
if !added {
groups = append(groups, conditionGroup{
conditions: []localizedCondition{condition},
status: condition.Status,
negativePolarity: stringInSlice(options.negativePolarityConditionTypes, condition.Type),
generation: condition.ObservedGeneration,
})
}
}
// If withLatestGeneration is true, form a conditionGroups of the groups
// with the latest generation.
if options.withLatestGeneration {
latestGen := groups.latestGeneration()
latestGroups := conditionGroups{}
for _, g := range groups {
if g.generation == latestGen {
latestGroups = append(latestGroups, g)
}
}
groups = latestGroups
}
// sort groups by priority
sort.Sort(groups)
// sorts conditions in the TopGroup so we ensure predictable result for merge strategies.
// condition are sorted using the same lexicographic order used by Set; in case two conditions
// have the same type, condition are sorted using according to the alphabetical order of the source object name.
if len(groups) > 0 {
sort.Slice(groups[0].conditions, func(i, j int) bool {
a := groups[0].conditions[i]
b := groups[0].conditions[j]
if a.Type != b.Type {
return lexicographicLess(a.Condition, b.Condition)
}
return a.GetName() < b.GetName()
})
}
return groups
}
// conditionGroups provides supports for grouping a list of conditions to be merged into a single condition.
// ConditionGroups can be sorted by mergePriority.
type conditionGroups []conditionGroup
func (g conditionGroups) Len() int {
return len(g)
}
func (g conditionGroups) Less(i, j int) bool {
return g[i].mergePriority() < g[j].mergePriority()
}
func (g conditionGroups) Swap(i, j int) {
g[i], g[j] = g[j], g[i]
}
// TopGroup returns the the condition group with the highest mergePriority.
func (g conditionGroups) TopGroup() *conditionGroup {
if len(g) == 0 {
return nil
}
return &g[0]
}
// TruePositivePolarityGroup returns the the condition group with status True/Positive, if any.
func (g conditionGroups) TruePositivePolarityGroup() *conditionGroup {
if g.Len() == 0 {
return nil
}
for _, group := range g {
if !group.negativePolarity && group.status == metav1.ConditionTrue {
return &group
}
}
return nil
}
// latestGeneration returns the latest generation of the conditionGroups.
func (g conditionGroups) latestGeneration() int64 {
var max int64
for _, group := range g {
if group.generation > max {
max = group.generation
}
}
return max
}
// conditionGroup defines a group of conditions with the same metav1.ConditionStatus, polarity and observed generation, and thus with the
// same priority when merging into a condition.
type conditionGroup struct {
status metav1.ConditionStatus
negativePolarity bool
conditions []localizedCondition
generation int64
}
// mergePriority provides a priority value for the status and polarity tuple that identifies this condition group. The
// mergePriority value allows an easier sorting of conditions groups.
func (g conditionGroup) mergePriority() (p int) {
switch g.status {
case metav1.ConditionTrue:
p = 0
if !g.negativePolarity {
p = 2
}
return
case metav1.ConditionFalse:
p = 1
if g.negativePolarity {
p = 3
}
return
case metav1.ConditionUnknown:
return 4
default:
return 99
}
}
func stringInSlice(s []string, val string) bool {
for _, s := range s {
if s == val {
return true
}
}
return false
}
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This file is modified from the source at
https://github.com/kubernetes-sigs/cluster-api/tree/7478817225e0a75acb6e14fc7b438231578073d2/util/conditions/merge_strategies.go,
and initially adapted to work with the `metav1.Condition` and `metav1.ConditionStatus` types.
More concretely, this includes the removal of "condition severity" related functionalities, as this is not supported by
the `metav1.Condition` type.
*/
package conditions
import (
"fmt"
"strings"
)
// mergeOptions allows to set strategies for merging a set of conditions into a single condition, and more specifically
// for computing the target Reason and the target Message.
type mergeOptions struct {
conditionTypes []string
negativePolarityConditionTypes []string
addSourceRef bool
addSourceRefIfConditionTypes []string
addCounter bool
addCounterOnlyIfConditionTypes []string
addStepCounter bool
addStepCounterIfOnlyConditionTypes []string
stepCounter int
withLatestGeneration bool
}
// MergeOption defines an option for computing a summary of conditions.
type MergeOption func(*mergeOptions)
// WithConditions instructs merge about the condition types to consider when doing a merge operation; if this option is
// not specified, all the conditions (except Ready, Stalled, and Reconciling) will be considered. This is required so we
// can provide some guarantees about the semantic of the target condition without worrying about side effects if someone
// or something adds custom conditions to the objects.
//
// NOTE: The order of conditions types defines the priority for determining the Reason and Message for the target
// condition.
// IMPORTANT: This options works only while generating a Summary or Aggregated condition.
func WithConditions(t ...string) MergeOption {
return func(c *mergeOptions) {
c.conditionTypes = t
}
}
// WithNegativePolarityConditions instructs merge about the condition types that adhere to a "normal-false" or
// "abnormal-true" pattern, i.e. that conditions are present with a value of True whenever something unusual happens.
//
// NOTE: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties
// IMPORTANT: This option works only while generating the Summary or Aggregated condition.
func WithNegativePolarityConditions(t ...string) MergeOption {
return func(c *mergeOptions) {
c.negativePolarityConditionTypes = t
}
}
// WithCounter instructs merge to add a "x of y Type" string to the message, where x is the number of conditions in the
// top group, y is the number of objects in scope, and Type is the top group condition type.
func WithCounter() MergeOption {
return func(c *mergeOptions) {
c.addCounter = true
}
}
// WithCounterIfOnly ensures a counter is show only if a subset of condition exists.
// This may apply when you want to use a step counter while reconciling the resource, but then want to move away from
// this notation as soon as the resource has been reconciled, and e.g. a health check condition is generated.
//
// IMPORTANT: This options requires WithStepCounter or WithStepCounterIf to be set.
// IMPORTANT: This option works only while generating the Aggregated condition.
func WithCounterIfOnly(t ...string) MergeOption {
return func(c *mergeOptions) {
c.addCounterOnlyIfConditionTypes = t
}
}
// WithStepCounter instructs merge to add a "x of y completed" string to the message, where x is the number of
// conditions with Status=true and y is the number of conditions in scope.
//
// IMPORTANT: This option works only while generating the Summary or Aggregated condition.
func WithStepCounter() MergeOption {
return func(c *mergeOptions) {
c.addStepCounter = true
}
}
// WithStepCounterIf adds a step counter if the value is true.
// This can be used e.g. to add a step counter only if the object is not being deleted.
//
// IMPORTANT: This option works only while generating the Summary or Aggregated condition.
func WithStepCounterIf(value bool) MergeOption {
return func(c *mergeOptions) {
c.addStepCounter = value
}
}
// WithStepCounterIfOnly ensures a step counter is show only if a subset of condition exists.
// This may apply when you want to use a step counter while reconciling the resource, but then want to move away from
// this notation as soon as the resource has been reconciled, and e.g. a health check condition is generated.
//
// IMPORTANT: This options requires WithStepCounter or WithStepCounterIf to be set.
// IMPORTANT: This option works only while generating the Summary or Aggregated condition.
func WithStepCounterIfOnly(t ...string) MergeOption {
return func(c *mergeOptions) {
c.addStepCounterIfOnlyConditionTypes = t
}
}
// WithSourceRef instructs merge to add info about the originating object to the target Reason and
// in summaries.
func WithSourceRef() MergeOption {
return func(c *mergeOptions) {
c.addSourceRef = true
}
}
// WithSourceRefIf ensures a source ref is show only if one of the types in the set exists.
func WithSourceRefIf(t ...string) MergeOption {
return func(c *mergeOptions) {
c.addSourceRefIfConditionTypes = t
}
}
// getReason returns the reason to be applied to the condition resulting by merging a set of condition groups.
// The reason is computed according to the given mergeOptions.
func getReason(groups conditionGroups, options *mergeOptions) string {
return getFirstReason(groups, options.conditionTypes, options.addSourceRef)
}
// getFirstReason returns the first reason from the ordered list of conditions in the top group.
// If required, the reason gets localized with the source object reference.
func getFirstReason(g conditionGroups, order []string, addSourceRef bool) string {
if condition := getFirstCondition(g, order); condition != nil {
reason := condition.Reason
if addSourceRef {
return localizeReason(reason, condition.Getter)
}
return reason
}
return ""
}
// localizeReason adds info about the originating object to the target Reason.
func localizeReason(reason string, from Getter) string {
if strings.Contains(reason, "@") {
return reason
}
return fmt.Sprintf("%s @ %s/%s", reason, from.GetObjectKind().GroupVersionKind().Kind, from.GetName())
}
// getMessage returns the message to be applied to the condition resulting by merging a set of condition groups.
// The message is computed according to the given mergeOptions, but in case of errors or warning a summary of existing
// errors is automatically added.
func getMessage(groups conditionGroups, options *mergeOptions) string {
if options.addStepCounter {
return getStepCounterMessage(groups, options.stepCounter)
}
if options.addCounter {
return getCounterMessage(groups, options.stepCounter)
}
return getFirstMessage(groups, options.conditionTypes)
}
// getCounterMessage returns a "x of y <Type>", where x is the number of conditions in the top group, y is the number
// passed to this method and <Type> is the condition type of the top group.
func getCounterMessage(groups conditionGroups, to int) string {
topGroup := groups.TopGroup()
if topGroup == nil {
return fmt.Sprintf("%d of %d", 0, to)
}
ct := len(topGroup.conditions)
return fmt.Sprintf("%d of %d %s", ct, to, topGroup.conditions[0].Type)
}
// getStepCounterMessage returns a message "x of y completed", where x is the number of conditions with Status=True and
// Polarity=Positive and y is the number passed to this method.
func getStepCounterMessage(groups conditionGroups, to int) string {
ct := 0
if trueGroup := groups.TruePositivePolarityGroup(); trueGroup != nil {
ct = len(trueGroup.conditions)
}
return fmt.Sprintf("%d of %d completed", ct, to)
}
// getFirstMessage returns the message from the ordered list of conditions in the top group.
func getFirstMessage(groups conditionGroups, order []string) string {
if condition := getFirstCondition(groups, order); condition != nil {
return condition.Message
}
return ""
}
// getFirstCondition returns a first condition from the ordered list of conditions in the top group.
func getFirstCondition(g conditionGroups, priority []string) *localizedCondition {
topGroup := g.TopGroup()
if topGroup == nil {
return nil
}
switch len(topGroup.conditions) {
case 0:
return nil
case 1:
return &topGroup.conditions[0]
default:
for _, p := range priority {
for _, c := range topGroup.conditions {
if c.Type == p {
return &c
}
}
}
return &topGroup.conditions[0]
}
}
// WithLatestGeneration instructs merge to consider the conditions with the
// latest observed generation only.
func WithLatestGeneration() MergeOption {
return func(c *mergeOptions) {
c.withLatestGeneration = true
}
}
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This file is modified from the source at
https://github.com/kubernetes-sigs/cluster-api/tree/7478817225e0a75acb6e14fc7b438231578073d2/util/conditions/patch.go,
and initially adapted to work with the `metav1.Condition` and `metav1.ConditionStatus` types.
More concretely, this includes the removal of "condition severity" related functionalities, as this is not supported by
the `metav1.Condition` type.
*/
package conditions
import (
"reflect"
"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Patch defines a list of operations to change a list of conditions into another.
type Patch []PatchOperation
// PatchOperation define an operation that changes a single condition.
type PatchOperation struct {
Before *metav1.Condition
After *metav1.Condition
Op PatchOperationType
}
// PatchOperationType defines patch operation types.
type PatchOperationType string
const (
// AddConditionPatch defines an add condition patch operation.
AddConditionPatch PatchOperationType = "Add"
// ChangeConditionPatch defines an change condition patch operation.
ChangeConditionPatch PatchOperationType = "Change"
// RemoveConditionPatch defines a remove condition patch operation.
RemoveConditionPatch PatchOperationType = "Remove"
)
// NewPatch returns the list of Patch required to align source conditions to after conditions.
func NewPatch(before Getter, after Getter) Patch {
var patch Patch
// Identify AddCondition and ModifyCondition changes.
targetConditions := after.GetConditions()
for i := range targetConditions {
targetCondition := targetConditions[i]
currentCondition := Get(before, targetCondition.Type)
if currentCondition == nil {
patch = append(patch, PatchOperation{Op: AddConditionPatch, After: &targetCondition})
continue
}
if !reflect.DeepEqual(&targetCondition, currentCondition) {
patch = append(patch, PatchOperation{Op: ChangeConditionPatch, After: &targetCondition, Before: currentCondition})
}
}
// Identify RemoveCondition changes.
baseConditions := before.GetConditions()
for i := range baseConditions {
baseCondition := baseConditions[i]
targetCondition := Get(after, baseCondition.Type)
if targetCondition == nil {
patch = append(patch, PatchOperation{Op: RemoveConditionPatch, Before: &baseCondition})
}
}
return patch
}
// applyOptions allows to set strategies for patch apply.
type applyOptions struct {
ownedConditions []string
forceOverwrite bool
}
func (o *applyOptions) isOwnedCondition(t string) bool {
for _, i := range o.ownedConditions {
if i == t {
return true
}
}
return false
}
// ApplyOption defines an option for applying a condition patch.
type ApplyOption func(*applyOptions)
// WithOwnedConditions allows to define condition types owned by the controller.
// In case of conflicts for the owned conditions, the patch helper will always use the value provided by the controller.
func WithOwnedConditions(t ...string) ApplyOption {
return func(c *applyOptions) {
c.ownedConditions = t
}
}
// WithForceOverwrite instructs the patch helper to always use the value provided by the controller in case of conflicts
// for the owned conditions.
func WithForceOverwrite(v bool) ApplyOption {
return func(c *applyOptions) {
c.forceOverwrite = v
}
}
// Apply executes a three-way merge of a list of Patch.
// When merge conflicts are detected (latest deviated from before in an incompatible way), an error is returned.
func (p Patch) Apply(latest Setter, options ...ApplyOption) error {
if len(p) == 0 {
return nil
}
applyOpt := &applyOptions{}
for _, o := range options {
o(applyOpt)
}
for _, conditionPatch := range p {
switch conditionPatch.Op {
case AddConditionPatch:
// If the conditions is owned, always keep the after value.
if applyOpt.forceOverwrite || applyOpt.isOwnedCondition(conditionPatch.After.Type) {
Set(latest, conditionPatch.After)
continue
}
// If the condition is already on latest, check if latest and after agree on the change; if not, this is a conflict.
if latestCondition := Get(latest, conditionPatch.After.Type); latestCondition != nil {
// If latest and after agree on the change, then it is a conflict.
if !hasSameState(latestCondition, conditionPatch.After) {
return errors.Errorf("error patching conditions: The condition %q was modified by a different process and this caused a merge/AddCondition conflict: %v", conditionPatch.After.Type, cmp.Diff(latestCondition, conditionPatch.After))
}
// otherwise, the latest is already as intended.
// NOTE: We are preserving LastTransitionTime from the latest in order to avoid altering the existing value.
continue
}
// If the condition does not exist on the latest, add the new after condition.
Set(latest, conditionPatch.After)
case ChangeConditionPatch:
// If the conditions is owned, always keep the after value.
if applyOpt.forceOverwrite || applyOpt.isOwnedCondition(conditionPatch.After.Type) {
Set(latest, conditionPatch.After)
continue
}
latestCondition := Get(latest, conditionPatch.After.Type)
// If the condition does not exist anymore on the latest, this is a conflict.
if latestCondition == nil {
return errors.Errorf("error patching conditions: The condition %q was deleted by a different process and this caused a merge/ChangeCondition conflict", conditionPatch.After.Type)
}
// If the condition on the latest is different from the base condition, check if
// the after state corresponds to the desired value. If not this is a conflict (unless we should ignore conflicts for this condition type).
if !reflect.DeepEqual(latestCondition, conditionPatch.Before) {
if !hasSameState(latestCondition, conditionPatch.After) {
return errors.Errorf("error patching conditions: The condition %q was modified by a different process and this caused a merge/ChangeCondition conflict: %v", conditionPatch.After.Type, cmp.Diff(latestCondition, conditionPatch.After))
}
// Otherwise the latest is already as intended.
// NOTE: We are preserving LastTransitionTime from the latest in order to avoid altering the existing value.
continue
}
// Otherwise apply the new after condition.
Set(latest, conditionPatch.After)
case RemoveConditionPatch:
// If the conditions is owned, always keep the after value (condition should be deleted).
if applyOpt.forceOverwrite || applyOpt.isOwnedCondition(conditionPatch.Before.Type) {
Delete(latest, conditionPatch.Before.Type)
continue
}
// If the condition is still on the latest, check if it is changed in the meantime;
// if so then this is a conflict.
if latestCondition := Get(latest, conditionPatch.Before.Type); latestCondition != nil {
if !hasSameState(latestCondition, conditionPatch.Before) {
return errors.Errorf("error patching conditions: The condition %q was modified by a different process and this caused a merge/RemoveCondition conflict: %v", conditionPatch.Before.Type, cmp.Diff(latestCondition, conditionPatch.Before))
}
}
// Otherwise the latest and after agreed on the delete operation, so there's nothing to change.
Delete(latest, conditionPatch.Before.Type)
}
}
return nil
}
// IsZero returns true if the patch has no changes.
func (p Patch) IsZero() bool {
return len(p) == 0
}
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This file is modified from the source at
https://github.com/kubernetes-sigs/cluster-api/tree/7478817225e0a75acb6e14fc7b438231578073d2/util/conditions/setter.go,
and initially adapted to work with the `metav1.Condition` and `metav1.ConditionStatus` types.
More concretely, this includes the removal of "condition severity" related functionalities, as this is not supported by
the `metav1.Condition` type.
*/
package conditions
import (
"fmt"
"sort"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/fluxcd/pkg/apis/meta"
)
// Setter is an interface that defines methods a Kubernetes object should implement in order to
// use the conditions package for setting conditions.
type Setter interface {
Getter
meta.ObjectWithConditionsSetter
}
// Set sets the given condition.
//
// NOTE: If a condition already exists, the LastTransitionTime is updated only if a change is detected in any of the
// following fields: Status, Reason, and Message. The ObservedGeneration is always updated.
func Set(to Setter, condition *metav1.Condition) {
if to == nil || condition == nil {
return
}
// Always set the observed generation on the condition.
condition.ObservedGeneration = to.GetGeneration()
// Trim the message to the maximum accepted length.
condition.Message = trimConditionMessage(condition.Message, maxMessageLength)
// Check if the new conditions already exists, and change it only if there is a status
// transition (otherwise we should preserve the current last transition time)-
conditions := to.GetConditions()
exists := false
for i := range conditions {
existingCondition := conditions[i]
if existingCondition.Type == condition.Type {
exists = true
if !hasSameState(&existingCondition, condition) {
condition.LastTransitionTime = metav1.NewTime(time.Now().UTC().Truncate(time.Second))
conditions[i] = *condition
break
}
condition.LastTransitionTime = existingCondition.LastTransitionTime
// For new observed generations, update the condition to have the
// new generation, preserving the last transition time.
if existingCondition.ObservedGeneration != condition.ObservedGeneration {
conditions[i] = *condition
}
break
}
}
// If the condition does not exist, add it, setting the transition time only if not already set
if !exists {
if condition.LastTransitionTime.IsZero() {
condition.LastTransitionTime = metav1.NewTime(time.Now().UTC().Truncate(time.Second))
}
conditions = append(conditions, *condition)
}
// Sort conditions for convenience of the consumer, i.e. kubectl.
sort.Slice(conditions, func(i, j int) bool {
return lexicographicLess(&conditions[i], &conditions[j])
})
to.SetConditions(conditions)
}
// TrueCondition returns a condition with Status=True and the given type, reason and message.
func TrueCondition(t, reason, messageFormat string, messageArgs ...interface{}) *metav1.Condition {
return &metav1.Condition{
Type: t,
Status: metav1.ConditionTrue,
Reason: reason,
Message: fmt.Sprintf(messageFormat, messageArgs...),
}
}
// FalseCondition returns a condition with Status=False and the given type, reason and message.
func FalseCondition(t, reason, messageFormat string, messageArgs ...interface{}) *metav1.Condition {
return &metav1.Condition{
Type: t,
Status: metav1.ConditionFalse,
Reason: reason,
Message: fmt.Sprintf(messageFormat, messageArgs...),
}
}
// UnknownCondition returns a condition with Status=Unknown and the given type, reason and message.
func UnknownCondition(t, reason, messageFormat string, messageArgs ...interface{}) *metav1.Condition {
return &metav1.Condition{
Type: t,
Status: metav1.ConditionUnknown,
Reason: reason,
Message: fmt.Sprintf(messageFormat, messageArgs...),
}
}
// MarkTrue sets Status=True for the condition with the given type, reason and message.
func MarkTrue(to Setter, t, reason, messageFormat string, messageArgs ...interface{}) {
Set(to, TrueCondition(t, reason, messageFormat, messageArgs...))
}
// MarkUnknown sets Status=Unknown for the condition with the given type, reason and message.
func MarkUnknown(to Setter, t, reason, messageFormat string, messageArgs ...interface{}) {
Set(to, UnknownCondition(t, reason, messageFormat, messageArgs...))
}
// MarkFalse sets Status=False for the condition with the given type, reason and message.
func MarkFalse(to Setter, t, reason, messageFormat string, messageArgs ...interface{}) {
Set(to, FalseCondition(t, reason, messageFormat, messageArgs...))
}
// MarkReconciling sets meta.ReconcilingCondition=True with the given reason and message, and deletes the
// meta.StalledCondition. This is normally called at the beginning of a reconcile run for an object.
// For more information about the condition types, see the kstatus spec:
// https://github.com/kubernetes-sigs/cli-utils/blob/e351b2bc43cec2107ba1d874c3dec54fd0956c59/pkg/kstatus/README.md#conditions
func MarkReconciling(to Setter, reason, messageFormat string, messageArgs ...interface{}) {
Delete(to, meta.StalledCondition)
MarkTrue(to, meta.ReconcilingCondition, reason, messageFormat, messageArgs...)
}
// MarkStalled sets meta.StalledCondition=True with the given reason and message, and deletes the
// meta.ReconcilingCondition. This is normally deferred and conditionally called at the end of a reconcile run for an
// object. A common approach is to mark the object stalled if the object is not requeued as a reconcile result.
// For more information about the condition types, see the kstatus spec:
// https://github.com/kubernetes-sigs/cli-utils/blob/e351b2bc43cec2107ba1d874c3dec54fd0956c59/pkg/kstatus/README.md#conditions
func MarkStalled(to Setter, reason, messageFormat string, messageArgs ...interface{}) {
Delete(to, meta.ReconcilingCondition)
MarkTrue(to, meta.StalledCondition, reason, messageFormat, messageArgs...)
}
// SetSummary creates a new summary condition with the summary of all the conditions existing on an object.
// If the object does not have other conditions, no summary condition is generated.
func SetSummary(to Setter, targetCondition string, options ...MergeOption) {
Set(to, summary(to, targetCondition, options...))
}
// SetMirror creates a new condition by mirroring the the Ready condition from a dependent object;
// if the Ready condition does not exists in the source object, no target conditions is generated.
func SetMirror(to Setter, targetCondition string, from Getter, options ...MirrorOptions) {
Set(to, mirror(from, targetCondition, options...))
}
// SetAggregate creates a new condition with the aggregation of all the conditions from a list of dependency objects,
// or a subset using WithConditions; if none of the source objects have a condition within the scope of the merge
// operation, no target condition is generated.
func SetAggregate(to Setter, targetCondition string, from []Getter, options ...MergeOption) {
Set(to, aggregate(from, targetCondition, options...))
}
// Delete deletes the condition with the given type.
func Delete(to Setter, t string) {
if to == nil {
return
}
conditions := to.GetConditions()
newConditions := make([]metav1.Condition, 0, len(conditions))
for _, condition := range conditions {
if condition.Type != t {
newConditions = append(newConditions, condition)
}
}
to.SetConditions(newConditions)
}
// conditionWeights defines the weight of condition types that have priority in lexicographicLess.
var conditionWeights = map[string]int{
meta.StalledCondition: 0,
meta.ReconcilingCondition: 1,
meta.ReadyCondition: 2,
}
// lexicographicLess returns true if a condition is less than another in regard to the order of conditions
// designed for convenience of the consumer, i.e. kubectl. The condition types in conditionWeights always go first,
// sorted by their defined weight, followed by all the other conditions sorted by highest observedGeneration and
// lexicographically by Type.
func lexicographicLess(i, j *metav1.Condition) bool {
w1, ok1 := conditionWeights[i.Type]
w2, ok2 := conditionWeights[j.Type]
switch {
case ok1 && ok2:
return w1 < w2
case ok1, ok2:
return !ok2
case i.ObservedGeneration == j.ObservedGeneration:
return i.Type < j.Type
default:
return i.ObservedGeneration > j.ObservedGeneration
}
}
// hasSameState returns true if a condition has the same state of another; state is defined by the union of following
// fields: Type, Status, Reason, and Message (it excludes LastTransitionTime and ObservedGeneration).
func hasSameState(i, j *metav1.Condition) bool {
return i.Type == j.Type &&
i.Status == j.Status &&
i.Reason == j.Reason &&
i.Message == j.Message
}
const (
maxMessageLength = 32768
trimmedMessageSuffix = "..."
)
// trimConditionMessage trims the condition message to the specified maximum length.
func trimConditionMessage(msg string, maxLength int) string {
if maxLength < len(trimmedMessageSuffix) {
maxLength = len(trimmedMessageSuffix)
}
if len(msg) <= maxLength {
return msg
}
trimmedMsg := msg[:maxLength-len(trimmedMessageSuffix)] + trimmedMessageSuffix
return trimmedMsg
}
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This file is modified from the source at
https://github.com/kubernetes-sigs/cluster-api/tree/7478817225e0a75acb6e14fc7b438231578073d2/util/conditions/unstructured.go,
and initially adapted to work with the `metav1.Condition` and `metav1.ConditionStatus` types.
More concretely, this includes the removal of "condition severity" related functionalities, as this is not supported by
the `metav1.Condition` type.
*/
package conditions
import (
"encoding/json"
"fmt"
"strings"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/log"
)
var (
ErrUnstructuredFieldNotFound = fmt.Errorf("field not found")
)
// UnstructuredGetter return a Getter object that can read conditions from an Unstructured object.
//
// IMPORTANT: This method should be used only with types implementing status conditions with a metav1.Condition type.
func UnstructuredGetter(u *unstructured.Unstructured) Getter {
return &unstructuredWrapper{Unstructured: u}
}
// UnstructuredSetter return a Setter object that can set conditions from an Unstructured object.
//
// IMPORTANT: This method should be used only with types implementing status conditions with a metav1.Condition type.
func UnstructuredSetter(u *unstructured.Unstructured) Setter {
return &unstructuredWrapper{Unstructured: u}
}
// UnstructuredUnmarshalField is a wrapper around JSON and Unstructured objects to decode and copy a specific field
// value into an object.
func UnstructuredUnmarshalField(u *unstructured.Unstructured, v interface{}, fields ...string) error {
value, found, err := unstructured.NestedFieldNoCopy(u.Object, fields...)
if err != nil {
return errors.Wrapf(err, "failed to retrieve field %q from %q", strings.Join(fields, "."), u.GroupVersionKind())
}
if !found || value == nil {
return ErrUnstructuredFieldNotFound
}
valueBytes, err := json.Marshal(value)
if err != nil {
return errors.Wrapf(err, "failed to json-encode field %q value from %q", strings.Join(fields, "."), u.GroupVersionKind())
}
if err := json.Unmarshal(valueBytes, v); err != nil {
return errors.Wrapf(err, "failed to json-decode field %q value from %q", strings.Join(fields, "."), u.GroupVersionKind())
}
return nil
}
type unstructuredWrapper struct {
*unstructured.Unstructured
}
// GetConditions returns the list of conditions from an Unstructured object.
//
// NOTE: Due to the constraints of JSON-unmarshal, this operation is to be considered best effort.
// In more details:
// - Errors during JSON-unmarshal are ignored and a empty collection list is returned.
// - It's not possible to detect if the object has an empty condition list or if it does not implement conditions;
// in both cases the operation returns an empty slice.
// - If the object doesn't implement status conditions as defined in GitOps Toolkit API,
// JSON-unmarshal matches incoming object keys to the keys; this can lead to to conditions values partially set.
func (c *unstructuredWrapper) GetConditions() []metav1.Condition {
conditions := []metav1.Condition{}
if err := UnstructuredUnmarshalField(c.Unstructured, &conditions, "status", "conditions"); err != nil {
return nil
}
return conditions
}
// SetConditions set the conditions into an Unstructured object.
//
// NOTE: Due to the constraints of JSON-unmarshal, this operation is to be considered best effort.
// In more details:
// - Errors during JSON-unmarshal are ignored and a empty collection list is returned.
// - It's not possible to detect if the object has an empty condition list or if it does not implement conditions;
// in both cases the operation returns an empty slice is returned.
func (c *unstructuredWrapper) SetConditions(conditions []metav1.Condition) {
v := make([]interface{}, 0, len(conditions))
for i := range conditions {
m, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&conditions[i])
if err != nil {
log.Log.Error(err, "Failed to convert Condition to unstructured map. This error shouldn't have occurred, please file an issue.", "groupVersionKind", c.GroupVersionKind(), "name", c.GetName(), "namespace", c.GetNamespace())
continue
}
v = append(v, m)
}
// unstructured.SetNestedField returns an error only if value cannot be set because one of
// the nesting levels is not a map[string]interface{}; this is not the case so the error should never happen here.
err := unstructured.SetNestedField(c.Unstructured.Object, v, "status", "conditions")
if err != nil {
log.Log.Error(err, "Failed to set Conditions on unstructured object. This error shouldn't have occurred, please file an issue.", "groupVersionKind", c.GroupVersionKind(), "name", c.GetName(), "namespace", c.GetNamespace())
}
}
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tls
import (
"crypto/tls"
"crypto/x509"
"fmt"
"net/http"
corev1 "k8s.io/api/core/v1"
)
const (
ClientCertIdentifier = "certFile"
ClientKeyIdentifier = "keyFile"
CACertIdentifier = "caFile"
)
// ConfigFromSecret returns a TLS config created from the content of the secret.
// An error is returned if the secret does not contain a ClientCertIdentifier and ClientKeyIdentifier, or a
// CACertIdentifier.
func ConfigFromSecret(certSecret *corev1.Secret) (*tls.Config, error) {
validSecret := false
tlsConfig := &tls.Config{}
clientCert, clientCertOk := certSecret.Data[ClientCertIdentifier]
clientKey, clientKeyOk := certSecret.Data[ClientKeyIdentifier]
if clientKeyOk != clientCertOk {
return nil, fmt.Errorf("found one of %s or %s, and expected both or neither", ClientCertIdentifier, ClientKeyIdentifier)
}
if clientCertOk && clientKeyOk {
validSecret = true
cert, err := tls.X509KeyPair(clientCert, clientKey)
if err != nil {
return nil, err
}
tlsConfig.Certificates = append(tlsConfig.Certificates, cert)
}
if caCert, ok := certSecret.Data[CACertIdentifier]; ok {
validSecret = true
sysCerts, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
sysCerts.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = sysCerts
}
if !validSecret {
return nil, fmt.Errorf("no %s and %s, or %s found in secret", ClientCertIdentifier, ClientKeyIdentifier, CACertIdentifier)
}
return tlsConfig, nil
}
// TransportFromSecret returns a HTTP transport with a TLS config created from the content of the secret.
// An error is returned if the secret does not contain a ClientCertIdentifier and ClientKeyIdentifier, or a
// CACertIdentifier.
func TransportFromSecret(certSecret *corev1.Secret) (*http.Transport, error) {
tlsConfig, err := ConfigFromSecret(certSecret)
if err != nil {
return nil, err
}
return &http.Transport{TLSClientConfig: tlsConfig}, nil
}