package utils
import (
"fmt"
"time"
"github.com/golang-jwt/jwt"
"github.com/sirupsen/logrus"
)
func GenerateOAuthJWT() (string, error) {
token := jwt.New(jwt.SigningMethodHS512)
claims := token.Claims.(jwt.MapClaims)
claims["exp"] = time.Now().Add(time.Minute * time.Duration(OAuthJWTExpDuration)).Unix()
tokenString, err := token.SignedString([]byte(OAuthJwtSecret))
if err != nil {
logrus.Info(err)
return "", err
}
return tokenString, nil
}
func ValidateOAuthJWT(tokenString string) (bool, error) {
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
if _, isValid := token.Method.(*jwt.SigningMethodHMAC); !isValid {
return nil, fmt.Errorf("invalid token %s", token.Header["alg"])
}
return []byte(OAuthJwtSecret), nil
})
if err != nil {
return false, err
}
if _, ok := token.Claims.(jwt.Claims); !ok && !token.Valid {
return false, err
}
return true, nil
}
package utils
import (
"crypto/tls"
"crypto/x509"
"os"
"strconv"
log "github.com/sirupsen/logrus"
)
var (
AdminName = os.Getenv("ADMIN_USERNAME")
AdminPassword = os.Getenv("ADMIN_PASSWORD")
DBUrl = os.Getenv("DB_SERVER")
DBUser = os.Getenv("DB_USER")
DBPassword = os.Getenv("DB_PASSWORD")
JWTExpiryDuration = getEnvAsInt("JWT_EXPIRY_MINS", 1440)
OAuthJWTExpDuration = getEnvAsInt("OAUTH_JWT_EXP_MINS", 5)
OAuthJwtSecret = os.Getenv("OAUTH_SECRET")
DexEnabled = getEnvAsBool("DEX_ENABLED", false)
DexCallBackURL = os.Getenv("DEX_OAUTH_CALLBACK_URL")
DexClientID = os.Getenv("DEX_OAUTH_CLIENT_ID")
DexClientSecret = os.Getenv("DEX_OAUTH_CLIENT_SECRET")
DexOIDCIssuer = os.Getenv("OIDC_ISSUER")
EnableInternalTls = getEnvAsBool("ENABLE_INTERNAL_TLS", false)
TlsCertPath = os.Getenv("TLS_CERT_PATH")
TlSKeyPath = os.Getenv("TLS_KEY_PATH")
CaCertPath = os.Getenv("CA_CERT_TLS_PATH")
RestPort = os.Getenv("REST_PORT")
GrpcPort = os.Getenv("GRPC_PORT")
DBName = "auth"
UserCollection = "users"
ProjectCollection = "project"
AuthConfigCollection = "auth-config"
RevokedTokenCollection = "revoked-token"
ApiTokenCollection = "api-token"
UsernameField = "username"
ExpiresAtField = "expires_at"
PasswordEncryptionCost = 8
DefaultLitmusGqlGrpcEndpoint = "localhost"
DefaultLitmusGqlGrpcPort = ":8000"
//DefaultLitmusGqlGrpcPortHttps = ":8001" // enable when in use
)
func getEnvAsInt(name string, defaultVal int) int {
valueStr := os.Getenv(name)
if value, err := strconv.Atoi(valueStr); err == nil {
return value
}
return defaultVal
}
func getEnvAsBool(name string, defaultVal bool) bool {
valueStr := os.Getenv(name)
if valueStr, err := strconv.ParseBool(valueStr); err == nil {
return valueStr
}
return defaultVal
}
func GetTlsConfig() *tls.Config {
// read ca's cert, verify to client's certificate
caPem, err := os.ReadFile(CaCertPath)
if err != nil {
log.Fatal(err)
}
// create cert pool and append ca's cert
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPem) {
log.Fatal(err)
}
// read server cert & key
serverCert, err := tls.LoadX509KeyPair(TlsCertPath, TlSKeyPath)
if err != nil {
log.Fatal(err)
}
// configuring TLS config based on provided certificates & keys to
conf := &tls.Config{
Certificates: []tls.Certificate{serverCert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
}
return conf
}
package utils
import (
"context"
"os"
grpc2 "github.com/litmuschaos/litmus/chaoscenter/authentication/api/presenter/protos"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
// GetProjectGRPCSvcClient returns an RPC client for Project service
func GetProjectGRPCSvcClient(conn *grpc.ClientConn) (grpc2.ProjectClient, *grpc.ClientConn) {
litmusGqlGrpcEndpoint := os.Getenv("LITMUS_GQL_GRPC_ENDPOINT")
litmusGqlGrpcPort := os.Getenv("LITMUS_GQL_GRPC_PORT")
if litmusGqlGrpcEndpoint == "" {
litmusGqlGrpcEndpoint = DefaultLitmusGqlGrpcEndpoint
}
if litmusGqlGrpcPort == "" {
litmusGqlGrpcPort = DefaultLitmusGqlGrpcPort
}
conn, err := grpc.Dial(litmusGqlGrpcEndpoint+litmusGqlGrpcPort, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
logrus.Fatalf("did not connect: %s", err)
}
return grpc2.NewProjectClient(conn), conn
}
// ProjectInitializer initializes a new project with default hub and image registry
func ProjectInitializer(context context.Context, client grpc2.ProjectClient, projectID string, role string) error {
_, err := client.InitializeProject(context,
&grpc2.ProjectInitializationRequest{
ProjectID: projectID,
Role: role,
})
return err
}
package utils
import (
"context"
"strings"
"time"
log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// MongoConnection creates a connection to the mongo
func MongoConnection() (*mongo.Client, error) {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
mongoCredentials := options.Credential{
Username: DBUser,
Password: DBPassword,
}
client, err := mongo.Connect(ctx, options.Client().ApplyURI(DBUrl).SetAuth(mongoCredentials))
if err != nil {
return nil, err
}
return client, nil
}
// CreateIndex creates a unique index for the given field in the collectionName
func CreateIndex(collectionName string, field string, db *mongo.Database) error {
mod := mongo.IndexModel{
Keys: bson.M{field: 1},
Options: options.Index().SetUnique(true),
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
collection := db.Collection(collectionName)
_, err := collection.Indexes().CreateOne(ctx, mod)
if err != nil {
log.Error(err)
return err
}
return nil
}
// CreateTTLIndex creates a TTL index for the given field in the collectionName
func CreateTTLIndex(collectionName string, db *mongo.Database) error {
// more info: https://www.mongodb.com/docs/manual/tutorial/expire-data/#expire-documents-at-a-specific-clock-time
mod := mongo.IndexModel{
Keys: bson.M{ExpiresAtField: 1},
Options: options.Index().SetExpireAfterSeconds(0),
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
collection := db.Collection(collectionName)
_, err := collection.Indexes().CreateOne(ctx, mod)
if err != nil {
log.Error(err)
return err
}
return nil
}
// CreateCollection creates a new mongo collection if it does not exist
func CreateCollection(collectionName string, db *mongo.Database) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := db.CreateCollection(ctx, collectionName)
if err != nil {
if strings.Contains(err.Error(), "already exists") {
log.Info(collectionName + "'s collection already exists, continuing with the existing mongo collection")
return nil
} else {
return err
}
}
log.Info(collectionName + "'s mongo collection created")
return nil
}
package utils
import (
crypto "crypto/rand"
"encoding/base64"
"fmt"
"regexp"
"strings"
)
// SanitizeString trims the string input
func SanitizeString(input string) string {
return strings.TrimSpace(input)
}
/*
ValidateStrictPassword represents and checks for the following patterns:
- Input is at least 8 characters long and at most 16 characters long
- Input contains at least one special character of these @$!%*?_&#
- Input contains at least one digit
- Input contains at least one uppercase alphabet
- Input contains at least one lowercase alphabet
*/
func ValidateStrictPassword(input string) error {
if len(input) < 8 {
return fmt.Errorf("password length is less than 8 characters")
}
if len(input) > 16 {
return fmt.Errorf("password length is more than 16 characters")
}
digits := `[0-9]{1}`
lowerAlphabets := `[a-z]{1}`
capitalAlphabets := `[A-Z]{1}`
specialCharacters := `[@$!%*?_&#]{1}`
if b, err := regexp.MatchString(digits, input); !b || err != nil {
return fmt.Errorf("password does not contain digits")
}
if b, err := regexp.MatchString(lowerAlphabets, input); !b || err != nil {
return fmt.Errorf("password does not contain lowercase alphabets")
}
if b, err := regexp.MatchString(capitalAlphabets, input); !b || err != nil {
return fmt.Errorf("password does not contain uppercase alphabets")
}
if b, err := regexp.MatchString(specialCharacters, input); !b || err != nil {
return fmt.Errorf("password does not contain special characters")
}
return nil
}
// RandomString generates random strings, can be used to create ids
func RandomString(n int) (string, error) {
if n > 0 {
b := make([]byte, n)
_, err := crypto.Read(b)
if err != nil {
return "", err
}
return base64.URLEncoding.EncodeToString(b), nil
}
return "", fmt.Errorf("length should be greater than 0")
}
// Username must start with a letter - ^[a-zA-Z]
// Allow letters, digits, underscores, and hyphens - [a-zA-Z0-9_-]
// Ensure the length of the username is between 3 and 16 characters (1 character is already matched above) - {2,15}$
func ValidateStrictUsername(username string) error {
// Ensure username doesn't contain special characters (only letters, numbers, and underscores are allowed)
if matched, _ := regexp.MatchString(`^[a-zA-Z][a-zA-Z0-9_-]{2,15}$`, username); !matched {
return fmt.Errorf("username can only contain letters, numbers, and underscores")
}
return nil
}
package authorization
import (
"context"
"net/http"
"strings"
"github.com/gin-gonic/gin"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
type contextKey string
const (
AuthKey = contextKey("authorization")
UserClaim = contextKey("user-claims")
BearerSchema = "Bearer "
CookieName = "token"
)
// Middleware verifies jwt and checks if user has enough privilege to access route (no roles' info needed)
func Middleware(handler http.Handler, mongoClient *mongo.Client) gin.HandlerFunc {
return func(c *gin.Context) {
jwt := ""
if c.Request.Header.Get("Authorization") != "" {
jwt = c.Request.Header.Get("Authorization")
}
if strings.HasPrefix(jwt, BearerSchema) {
jwt = jwt[len(BearerSchema):]
}
if IsRevokedToken(jwt, mongoClient) {
c.Writer.WriteHeader(http.StatusUnauthorized)
c.Writer.Write([]byte("Error verifying JWT token: Token is revoked"))
return
}
ctx := context.WithValue(c.Request.Context(), AuthKey, jwt)
ctx1 := context.WithValue(ctx, "request-header", c.Request.Header)
c.Request = c.Request.WithContext(ctx1)
handler.ServeHTTP(c.Writer, c.Request)
}
}
// IsRevokedToken checks if the given JWT Token is revoked
func IsRevokedToken(tokenString string, mongoClient *mongo.Client) bool {
collection := mongoClient.Database("auth").Collection("revoked-token")
if err := collection.FindOne(context.Background(), bson.M{
"token": tokenString,
}).Err(); err != nil {
return false
}
return true
}
package authorization
import (
"context"
"errors"
"fmt"
"log"
"github.com/litmuschaos/litmus/chaoscenter/graphql/server/pkg/database/mongodb"
"github.com/litmuschaos/litmus/chaoscenter/graphql/server/pkg/database/mongodb/authConfig"
"github.com/golang-jwt/jwt"
)
// UserValidateJWT validates the cluster jwt
func UserValidateJWT(token string, salt string) (jwt.MapClaims, error) {
tkn, err := jwt.Parse(token, func(token *jwt.Token) (interface{}, error) {
if _, isValid := token.Method.(*jwt.SigningMethodHMAC); !isValid {
return nil, fmt.Errorf("invalid token %s", token.Header["alg"])
}
return []byte(salt), nil
})
if err != nil {
log.Print("USER JWT ERROR: ", err)
return nil, errors.New("invalid Token")
}
if !tkn.Valid {
return nil, errors.New("invalid Token")
}
claims, ok := tkn.Claims.(jwt.MapClaims)
if ok {
return claims, nil
}
return nil, errors.New("invalid Token")
}
// GetUsername returns the username from the jwt token
func GetUsername(token string) (string, error) {
salt, err := authConfig.NewAuthConfigOperator(mongodb.Operator).GetAuthConfig(context.Background())
if err != nil {
return "", err
}
tkn, err := jwt.Parse(token, func(token *jwt.Token) (interface{}, error) {
return []byte(salt.Value), nil
})
if err != nil {
log.Print("USER JWT ERROR: ", err)
return "", errors.New("invalid Token")
}
claims, ok := tkn.Claims.(jwt.MapClaims)
if ok {
return claims["username"].(string), nil
}
return "", errors.New("invalid Token")
}
package authorization
import (
"context"
"errors"
"github.com/litmuschaos/litmus/chaoscenter/graphql/server/pkg/grpc"
"github.com/sirupsen/logrus"
grpc2 "google.golang.org/grpc"
)
// ValidateRole Validates the role of a user in a given project
func ValidateRole(ctx context.Context, projectID string,
requiredRoles []string, invitation string) error {
jwt := ctx.Value(AuthKey).(string)
var conn *grpc2.ClientConn
client, conn := grpc.GetAuthGRPCSvcClient(conn)
defer conn.Close()
err := grpc.ValidatorGRPCRequest(client, jwt, projectID,
requiredRoles,
invitation)
if err != nil {
logrus.Error(err)
return errors.New("permission_denied: " + err.Error())
}
return nil
}
package handler
import (
"archive/zip"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/mrz1836/go-sanitize"
"github.com/gin-gonic/gin"
"github.com/litmuschaos/litmus/chaoscenter/graphql/server/graph/model"
chaoshubops "github.com/litmuschaos/litmus/chaoscenter/graphql/server/pkg/chaoshub/ops"
"github.com/litmuschaos/litmus/chaoscenter/graphql/server/pkg/database/mongodb/chaos_hub"
"github.com/litmuschaos/litmus/chaoscenter/graphql/server/utils"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
const DefaultPath = "/tmp/"
// GetChartsPath is used to construct path for given chart.
func GetChartsPath(chartsInput model.CloningInput, projectID string, isDefault bool) string {
var repoPath string
if isDefault {
repoPath = DefaultPath + "default/" + chartsInput.Name + "/faults/"
} else {
repoPath = DefaultPath + projectID + "/" + chartsInput.Name + "/faults/"
}
return repoPath
}
// GetChartsData is used to get details of charts like experiments.
func GetChartsData(chartsPath string) ([]*model.Chart, error) {
var allChartsDetails []ChaosChart
Charts, err := os.ReadDir(chartsPath)
if err != nil {
log.Error("file reading error", err)
return nil, err
}
for _, chart := range Charts {
if chart.Name() == "icons" {
continue
}
chartDetails, _ := ReadExperimentFile(chartsPath + chart.Name() + "/" + chart.Name() + ".chartserviceversion.yaml")
allChartsDetails = append(allChartsDetails, chartDetails)
}
e, err := json.Marshal(allChartsDetails)
if err != nil {
return nil, err
}
var unmarshalledData []*model.Chart
err = json.Unmarshal(e, &unmarshalledData)
if err != nil {
return nil, err
}
return unmarshalledData, nil
}
// GetExperimentData is used for getting details of selected Experiment path
func GetExperimentData(experimentFilePath string) (*model.Chart, error) {
data, err := ReadExperimentFile(experimentFilePath)
if err != nil {
return nil, err
}
e, err := json.Marshal(data)
if err != nil {
return nil, err
}
var chartData *model.Chart
if err = json.Unmarshal(e, &chartData); err != nil {
return nil, err
}
return chartData, nil
}
// ReadExperimentFile is used for reading experiment file from given path
func ReadExperimentFile(path string) (ChaosChart, error) {
var experiment ChaosChart
experimentFile, err := os.ReadFile(path)
if err != nil {
return experiment, fmt.Errorf("file path of the, err: %+v", err)
}
if err = yaml.Unmarshal(experimentFile, &experiment); err != nil {
return experiment, err
}
return experiment, nil
}
// ReadExperimentYAMLFile is used for reading experiment/engine file from given path
func ReadExperimentYAMLFile(path string) (string, error) {
var s string
YAMLData, err := os.ReadFile(path)
if err != nil {
return s, fmt.Errorf("file path of the, err: %+v", err)
}
s = string(YAMLData)
return s, nil
}
// ListPredefinedWorkflowDetails reads the workflow directory for all the predefined experiments
// and returns the csv, workflow manifest and workflow name
func ListPredefinedWorkflowDetails(name string, projectID string) ([]*model.PredefinedExperimentList, error) {
experimentsPath := DefaultPath + projectID + "/" + name + "/workflows"
var predefinedWorkflows []*model.PredefinedExperimentList
files, err := os.ReadDir(experimentsPath)
if err != nil {
return nil, err
}
for _, file := range files {
csvManifest := ""
workflowManifest := ""
isExist, err := IsFileExisting(experimentsPath + "/" + file.Name() + "/" + file.Name() + ".chartserviceversion.yaml")
if err != nil {
return nil, err
}
if isExist {
csvManifest, err = ReadExperimentYAMLFile(experimentsPath + "/" + file.Name() + "/" + file.Name() + ".chartserviceversion.yaml")
if err != nil {
csvManifest = "na"
}
workflowManifest, err = ReadExperimentYAMLFile(experimentsPath + "/" + file.Name() + "/" + "workflow.yaml")
if err != nil {
workflowManifest = "na"
}
preDefinedWorkflow := &model.PredefinedExperimentList{
ExperimentName: file.Name(),
ExperimentManifest: workflowManifest,
ExperimentCSV: csvManifest,
}
predefinedWorkflows = append(predefinedWorkflows, preDefinedWorkflow)
}
}
return predefinedWorkflows, nil
}
func IsFileExisting(path string) (bool, error) {
_, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
}
return true, nil
}
// DownloadRemoteHub is used to download a remote hub from the url provided by the user
func DownloadRemoteHub(hubDetails model.CreateRemoteChaosHub, projectID string) error {
dirPath := DefaultPath + projectID
err := os.MkdirAll(dirPath, 0755)
if err != nil {
return err
}
//create the destination directory where the hub will be downloaded
hubPath := dirPath + "/" + hubDetails.Name + ".zip"
destDir, err := os.Create(hubPath)
if err != nil {
log.Error(err)
return err
}
defer destDir.Close()
//download the zip file from the provided url
download, err := http.Get(sanitize.URL(hubDetails.RepoURL))
if err != nil {
log.Error(err)
return err
}
defer download.Body.Close()
if download.StatusCode != http.StatusOK {
return fmt.Errorf("err: " + download.Status)
}
//validate the content length (in bytes)
maxSize, err := strconv.Atoi(utils.Config.RemoteHubMaxSize)
if err != nil {
return err
}
contentLength := download.Header.Get("content-length")
length, err := strconv.Atoi(contentLength)
if length > maxSize {
_ = os.Remove(hubPath)
return fmt.Errorf("err: File size exceeded the threshold %d", length)
}
//validate the content-type
contentType := download.Header.Get("content-type")
if contentType != "application/zip" {
_ = os.Remove(hubPath)
return fmt.Errorf("err: Invalid file type %s", contentType)
}
//copy the downloaded content to the created zip file
_, err = io.Copy(destDir, download.Body)
if err != nil {
log.Error(err)
return err
}
//unzip the ChaosHub to the default hub directory
err = UnzipRemoteHub(hubPath, projectID)
if err != nil {
return err
}
//remove the redundant zip file
err = os.Remove(hubPath)
if err != nil {
return err
}
return nil
}
// UnzipRemoteHub is used to unzip the zip file
func UnzipRemoteHub(zipPath string, projectID string) error {
extractPath := DefaultPath + projectID
zipReader, err := zip.OpenReader(zipPath)
if err != nil {
log.Error(err)
return err
}
defer func(zipReader *zip.ReadCloser) {
err := zipReader.Close()
if err != nil {
log.Error(err)
}
}(zipReader)
for _, file := range zipReader.File {
err := CopyZipItems(file, extractPath, file.Name)
if err != nil {
return err
}
}
return nil
}
// CopyZipItems is used to copy the content from the extracted zip file to
// the ChaosHub directory
func CopyZipItems(file *zip.File, extractPath string, chartsPath string) error {
path := filepath.Join(extractPath, chartsPath)
if !strings.HasPrefix(path, filepath.Clean(extractPath)+string(os.PathSeparator)) {
return fmt.Errorf("illegal file path: %s", path)
}
err := os.MkdirAll(filepath.Dir(path), os.ModeDir|os.ModePerm)
if err != nil {
log.Error(err)
}
fileReader, err := file.Open()
if err != nil {
log.Error(err)
}
if !file.FileInfo().IsDir() {
fileCopy, err := os.Create(path)
if err != nil {
log.Error(err)
}
_, err = io.Copy(fileCopy, fileReader)
if err != nil {
log.Error(err)
}
fileCopy.Close()
}
fileReader.Close()
return nil
}
// SyncRemoteRepo is used to sync the remote ChaosHub
func SyncRemoteRepo(hubData model.CloningInput, projectID string) error {
hubPath := DefaultPath + projectID + "/" + hubData.Name
err := os.RemoveAll(hubPath)
if err != nil {
return err
}
updateHub := model.CreateRemoteChaosHub{
Name: hubData.Name,
RepoURL: hubData.RepoURL,
}
log.Info("downloading remote hub")
err = DownloadRemoteHub(updateHub, projectID)
if err != nil {
return err
}
log.Info("remote hub ", hubData.Name, "downloaded ")
return nil
}
// ValidateLocalRepository validates the repository directory and checks it by plain opening it.
func ValidateLocalRepository(hub chaos_hub.ChaosHub) (bool, error) {
var repoPath string
if hub.IsDefault {
repoPath = DefaultPath + "default/" + hub.Name
} else {
repoPath = DefaultPath + hub.ProjectID + "/" + hub.Name
}
err := chaoshubops.GitPlainOpen(repoPath)
if err != nil {
return false, err
}
return true, nil
}
// ChaosHubIconHandler is used for fetching ChaosHub icons
func ChaosHubIconHandler() gin.HandlerFunc {
return func(c *gin.Context) {
var (
img *os.File
err error
responseStatusCode int
)
if strings.ToLower(c.Param("chartName")) == "predefined" {
img, err = os.Open(utils.Config.CustomChaosHubPath + c.Param("projectId") + "/" + c.Param("hubName") + "/experiments/icons/" + c.Param("iconName"))
responseStatusCode = http.StatusOK
if err != nil {
responseStatusCode = http.StatusInternalServerError
log.WithError(err).Error("icon cannot be fetched")
fmt.Fprint(c.Writer, "icon cannot be fetched, err : "+err.Error())
}
} else {
img, err = os.Open(utils.Config.CustomChaosHubPath + c.Param("projectId") + "/" + c.Param("hubName") + "/faults/" + c.Param("chartName") + "/icons/" + c.Param("iconName"))
responseStatusCode = http.StatusOK
if err != nil {
responseStatusCode = http.StatusInternalServerError
log.WithError(err).Error("icon cannot be fetched")
fmt.Fprint(c.Writer, "icon cannot be fetched, err : "+err.Error())
}
}
defer img.Close()
c.Writer.Header().Set("Content-Type", "image/png")
c.Writer.WriteHeader(responseStatusCode)
io.Copy(c.Writer, img)
}
}
func DefaultChaosHubIconHandler() gin.HandlerFunc {
return func(c *gin.Context) {
var (
img *os.File
err error
responseStatusCode int
)
if strings.ToLower(c.Param("chartName")) == "predefined" {
img, err = os.Open(utils.Config.DefaultChaosHubPath + c.Param("hubName") + "/experiments/icons/" + c.Param("iconName"))
responseStatusCode = http.StatusOK
if err != nil {
responseStatusCode = http.StatusInternalServerError
log.WithError(err).Error("icon cannot be fetched")
fmt.Fprint(c.Writer, "icon cannot be fetched, err : "+err.Error())
}
} else {
img, err = os.Open(utils.Config.DefaultChaosHubPath + c.Param("hubName") + "/faults/" + c.Param("chartName") + "/icons/" + c.Param("iconName"))
responseStatusCode = http.StatusOK
if err != nil {
responseStatusCode = http.StatusInternalServerError
log.WithError(err).Error("icon cannot be fetched")
fmt.Fprint(c.Writer, "icon cannot be fetched, err : "+err.Error())
}
}
defer func(img *os.File) {
err := img.Close()
if err != nil {
log.WithError(err).Error("error while closing the file")
}
}(img)
c.Writer.Header().Set("Content-Type", "image/png")
c.Writer.WriteHeader(responseStatusCode)
io.Copy(c.Writer, img)
}
}
package events
import (
"context"
"errors"
"fmt"
"strconv"
"subscriber/pkg/types"
wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
chaosTypes "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
"github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned"
litmusV1alpha1 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned/typed/litmuschaos/v1alpha1"
"github.com/litmuschaos/chaos-operator/pkg/client/informers/externalversions"
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
mergeType "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
)
// ChaosEventWatcher initializes the Litmus ChaosEngine event watcher
func (ev *subscriberEvents) ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
startTime, err := strconv.Atoi(infraData["START_TIME"])
if err != nil {
logrus.WithError(err).Fatal("failed to parse startTime")
}
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get kube config")
}
// ClientSet to create Informer
clientSet, err := versioned.NewForConfig(cfg)
if err != nil {
logrus.WithError(err).Fatal("could not generate dynamic client for config")
}
// Create a factory object to watch workflows depending on default scope
f := externalversions.NewSharedInformerFactoryWithOptions(clientSet, resyncPeriod,
externalversions.WithTweakListOptions(func(list *v1.ListOptions) {
list.LabelSelector = fmt.Sprintf("infra_id=%s,type=standalone_workflow", InfraID)
}))
informer := f.Litmuschaos().V1alpha1().ChaosEngines().Informer()
if InfraScope == "namespace" {
f = externalversions.NewSharedInformerFactoryWithOptions(clientSet, resyncPeriod, externalversions.WithNamespace(InfraNamespace),
externalversions.WithTweakListOptions(func(list *v1.ListOptions) {
list.LabelSelector = fmt.Sprintf("infra_id=%s,type=standalone_workflow", InfraID)
}))
informer = f.Litmuschaos().V1alpha1().ChaosEngines().Informer()
}
go ev.startWatchEngine(stopCh, informer, stream, int64(startTime))
}
// handles the different*subscriberEvents - add, update and delete
func (ev *subscriberEvents) startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ev.chaosEventHandler(obj, "ADD", stream, startTime)
},
UpdateFunc: func(oldObj, obj interface{}) {
ev.chaosEventHandler(obj, "UPDATE", stream, startTime)
},
}
s.AddEventHandler(handlers)
s.Run(stopCh)
}
// responsible for extracting the required data from the event and streaming
func (ev *subscriberEvents) chaosEventHandler(obj interface{}, eventType string, stream chan types.WorkflowEvent, startTime int64) {
workflowObj := obj.(*chaosTypes.ChaosEngine)
if workflowObj.Labels["workflow_id"] == "" {
logrus.WithFields(map[string]interface{}{
"uid": string(workflowObj.ObjectMeta.UID),
"wf_id": workflowObj.Labels["workflow_id"],
"infra_id": workflowObj.Labels["infra_id"],
}).Printf("CHAOSENGINE RUN IGNORED [INVALID METADATA]")
return
}
if workflowObj.ObjectMeta.CreationTimestamp.UnixMilli() < startTime {
return
}
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get kube config")
}
chaosClient, err := litmusV1alpha1.NewForConfig(cfg)
if err != nil {
logrus.WithError(err).Fatal("could not get Chaos ClientSet")
}
nodes := make(map[string]types.Node)
logrus.Print("STANDALONE CHAOSENGINE EVENT ", workflowObj.UID, " ", eventType)
var cd *types.ChaosData = nil
//extracts chaos data
cd, err = ev.getChaosData(v1alpha1.NodeStatus{StartedAt: workflowObj.ObjectMeta.CreationTimestamp}, workflowObj.Name, workflowObj.Namespace, chaosClient)
if err != nil {
logrus.WithError(err).Print("FAILED PARSING CHAOS ENGINE CRD")
}
// considering chaos*subscriberEvents has only 1 artifact with manifest as raw data
finTime := int64(-1)
if workflowObj.Status.EngineStatus == chaosTypes.EngineStatusCompleted || workflowObj.Status.EngineStatus == chaosTypes.EngineStatusStopped {
if len(workflowObj.Status.Experiments) > 0 {
finTime = workflowObj.Status.Experiments[0].LastUpdateTime.Unix()
}
}
nodes[workflowObj.Name] = types.Node{
Name: workflowObj.Name,
Phase: "Succeeded",
StartedAt: StrConvTime(workflowObj.CreationTimestamp.Unix()),
FinishedAt: StrConvTime(finTime),
Children: []string{workflowObj.Name + "-engine"},
Type: "Steps",
}
details := types.Node{
Name: workflowObj.Name,
Phase: mapStatus(workflowObj.Status.EngineStatus),
Type: "ChaosEngine",
StartedAt: StrConvTime(workflowObj.CreationTimestamp.Unix()),
FinishedAt: StrConvTime(finTime),
Children: []string{},
ChaosExp: cd,
Message: string(workflowObj.Status.EngineStatus),
}
nodes[workflowObj.Name+"-engine"] = details
workflow := types.WorkflowEvent{
WorkflowType: "chaosengine",
WorkflowID: workflowObj.Labels["workflow_id"],
EventType: eventType,
UID: string(workflowObj.ObjectMeta.UID),
Namespace: workflowObj.ObjectMeta.Namespace,
Name: workflowObj.ObjectMeta.Name,
CreationTimestamp: StrConvTime(workflowObj.ObjectMeta.CreationTimestamp.Unix()),
Phase: details.Phase,
Message: details.Message,
StartedAt: details.StartedAt,
FinishedAt: details.FinishedAt,
Nodes: nodes,
}
//stream
stream <- workflow
}
// StopChaosEngineState is used to patch all the chaosEngines with engineState=stop
func (ev *subscriberEvents) StopChaosEngineState(namespace string, workflowRunID *string) error {
ctx := context.TODO()
//Define the GVR
resourceType := schema.GroupVersionResource{
Group: "litmuschaos.io",
Version: "v1alpha1",
Resource: "chaosengines",
}
//Generate the dynamic client
_, dynamicClient, err := ev.subscriberK8s.GetDynamicAndDiscoveryClient()
if err != nil {
return errors.New("failed to get dynamic client, error: " + err.Error())
}
listOption := v1.ListOptions{}
if workflowRunID != nil {
listOption.LabelSelector = fmt.Sprintf("workflow_run_id=%s", *workflowRunID)
}
//List all chaosEngines present in the particular namespace
chaosEngines, err := dynamicClient.Resource(resourceType).Namespace(namespace).List(context.TODO(), listOption)
if err != nil {
return errors.New("failed to list chaosengines: " + err.Error())
}
//Foe every subscriber patch the engineState to Stop
for _, val := range chaosEngines.Items {
patch := []byte(`{"spec":{"engineState":"stop"}}`)
patched, err := dynamicClient.Resource(resourceType).Namespace(namespace).Patch(ctx, val.GetName(), mergeType.MergePatchType, patch, v1.PatchOptions{})
if err != nil {
return errors.New("failed to patch chaosengines: " + err.Error())
}
if patched != nil {
logrus.Info("Successfully patched ChaosEngine: ", patched.GetName())
}
}
return nil
}
// StopWorkflow will patch the workflow based on workflow name using the shutdown strategy
func (ev *subscriberEvents) StopWorkflow(wfName string, namespace string) error {
conf, err := ev.subscriberK8s.GetKubeConfig()
wfClient := wfclientset.NewForConfigOrDie(conf).ArgoprojV1alpha1().Workflows(namespace)
patch := []byte(`{"spec":{"shutdown":"Stop"}}`)
wf, err := wfClient.Patch(context.TODO(), wfName, mergeType.MergePatchType, patch, v1.PatchOptions{})
if err != nil {
return fmt.Errorf("error in patching workflow: %w", err)
}
if wf != nil {
logrus.Info("Successfully patched workflow: ", wf.GetName())
return nil
}
return nil
}
func mapStatus(status chaosTypes.EngineStatus) string {
switch status {
case chaosTypes.EngineStatusInitialized:
return "Running"
case chaosTypes.EngineStatusCompleted:
return "Succeeded"
case chaosTypes.EngineStatusStopped:
return "Stopped"
default:
return "Running"
}
}
package events
import (
"subscriber/pkg/graphql"
"subscriber/pkg/types"
"subscriber/pkg/k8s"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha12 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned/typed/litmuschaos/v1alpha1"
)
type SubscriberEvents interface {
ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string)
StopChaosEngineState(namespace string, workflowRunID *string) error
CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error)
GetWorkflowObj(uid string) (*v1alpha1.Workflow, error)
ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error)
GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error)
WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string)
WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error)
SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error)
WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent)
StopWorkflow(wfName string, namespace string) error
}
type subscriberEvents struct {
gqlSubscriberServer graphql.SubscriberGql
subscriberK8s k8s.SubscriberK8s
}
func NewSubscriberEventsOperator(gqlSubscriberServer graphql.SubscriberGql, subscriberK8s k8s.SubscriberK8s) SubscriberEvents {
return &subscriberEvents{
gqlSubscriberServer: gqlSubscriberServer,
subscriberK8s: subscriberK8s,
}
}
package events
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"subscriber/pkg/types"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
v1alpha12 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned/typed/litmuschaos/v1alpha1"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
)
// util function, extracts the chaos data using the litmus go-client
func (ev *subscriberEvents) getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (*types.ChaosData, error) {
cd := &types.ChaosData{}
cd.EngineName = engineName
cd.Namespace = engineNS
crd, err := chaosClient.ChaosEngines(cd.Namespace).Get(context.Background(), cd.EngineName, v1.GetOptions{})
if err != nil {
return nil, err
}
if nodeStatus.StartedAt.Unix() > crd.ObjectMeta.CreationTimestamp.Unix() {
logrus.Errorf("chaosengine resource older than current events node | workflow time : %v | engine time : %v", nodeStatus.StartedAt.Unix(), crd.ObjectMeta.CreationTimestamp.Unix())
return nil, nil
}
cd.ProbeSuccessPercentage = "0"
cd.FailStep = ""
cd.EngineUID = string(crd.ObjectMeta.UID)
cd.EngineContext = string(crd.Labels["context"])
if strings.ToLower(string(crd.Status.EngineStatus)) == "stopped" {
cd.ExperimentVerdict = "Fail"
cd.ExperimentStatus = string(crd.Status.EngineStatus)
}
if len(crd.Status.Experiments) == 0 {
return cd, nil
}
// considering chaosengine will only have 1 experiment
cd.ExperimentPod = crd.Status.Experiments[0].ExpPod
cd.RunnerPod = crd.Status.Experiments[0].Runner
cd.ExperimentStatus = string(crd.Status.EngineStatus)
cd.ExperimentName = crd.Status.Experiments[0].Name
cd.LastUpdatedAt = strconv.FormatInt(crd.Status.Experiments[0].LastUpdateTime.Unix(), 10)
cd.ExperimentVerdict = crd.Status.Experiments[0].Verdict
if strings.ToLower(string(crd.Status.EngineStatus)) == "stopped" || (strings.ToLower(string(crd.Status.EngineStatus)) == "completed" && strings.ToLower(cd.ExperimentVerdict) != "pass") {
cd.ExperimentVerdict = "Fail"
cd.ExperimentStatus = string(crd.Status.EngineStatus)
}
if len(crd.Status.Experiments) == 1 {
expRes, err := chaosClient.ChaosResults(cd.Namespace).Get(context.Background(), crd.Name+"-"+crd.Status.Experiments[0].Name, v1.GetOptions{})
if err != nil {
return cd, err
}
// annotations sometimes cause failure in gql message escaping
expRes.Annotations = nil
cd.ChaosResult = expRes
cd.ProbeSuccessPercentage = expRes.Status.ExperimentStatus.ProbeSuccessPercentage
if expRes.Status.ExperimentStatus.ErrorOutput != nil {
cd.FailStep = fmt.Sprintf("%s : %s", expRes.Status.ExperimentStatus.ErrorOutput.ErrorCode, expRes.Status.ExperimentStatus.ErrorOutput.Reason)
}
cd.ExperimentStatus = string(expRes.Status.ExperimentStatus.Phase)
}
return cd, nil
}
// CheckChaosData util function, checks if event is a chaos-exp event, if so - extract the chaos data
func (ev *subscriberEvents) CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error) {
nodeType := string(nodeStatus.Type)
var cd *types.ChaosData = nil
// considering chaos events has only 1 artifact with manifest as raw data
data := nodeStatus.Inputs.Artifacts[0].Raw.Data
obj := &unstructured.Unstructured{}
decUnstructured := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme)
_, _, err := decUnstructured.Decode([]byte(data), nil, obj)
if err == nil && obj.GetKind() == "ChaosEngine" {
nodeType = "ChaosEngine"
if nodeStatus.Phase != "Pending" {
name := obj.GetName()
if obj.GetGenerateName() != "" {
log, err := ev.subscriberK8s.GetLogs(nodeStatus.ID, workflowNS, "main")
if err != nil {
return nodeType, nil, err
}
name = getNameFromLog(log)
if name == "" {
return nodeType, nil, errors.New("Chaos-Engine Generated Name couldn't be retrieved")
}
}
cd, err = ev.getChaosData(nodeStatus, name, obj.GetNamespace(), chaosClient)
return nodeType, cd, err
}
}
return nodeType, nil, nil
}
func getNameFromLog(log string) string {
re := regexp.MustCompile(`ChaosEngine Name : ([\w-]+)`)
match := re.FindStringSubmatch(log)
if len(match) < 2 {
return ""
}
return match[1]
}
// StrConvTime converts unix timestamp to string
func StrConvTime(time int64) string {
if time < 0 {
return ""
} else {
return strconv.FormatInt(time, 10)
}
}
func (ev *subscriberEvents) GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
ctx := context.TODO()
conf, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
return nil, err
}
// create the events client
wfClient := wfclientset.NewForConfigOrDie(conf).ArgoprojV1alpha1().Workflows(InfraNamespace)
listWf, err := wfClient.List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
for _, wf := range listWf.Items {
if string(wf.UID) == uid {
return &wf, nil
}
}
return nil, nil
}
func (ev *subscriberEvents) ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
ctx := context.TODO()
conf, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
return nil, err
}
listOption := v1.ListOptions{}
listOption.LabelSelector = fmt.Sprintf("workflow_id=%s", wfid)
// create the events client
wfClient := wfclientset.NewForConfigOrDie(conf).ArgoprojV1alpha1().Workflows(InfraNamespace)
listWf, err := wfClient.List(ctx, listOption)
if err != nil {
return nil, err
}
return listWf, nil
}
// GenerateWorkflowPayload generate graphql mutation payload for events event
func (ev *subscriberEvents) GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error) {
infraID := `{infraID: \"` + cid + `\", version: \"` + version + `\", accessKey: \"` + accessKey + `\"}`
for id, event := range wfEvent.Nodes {
event.Message = strings.Replace(event.Message, `"`, ``, -1)
wfEvent.Nodes[id] = event
}
data, err := json.Marshal(wfEvent)
if err != nil {
return nil, err
}
executionData := base64.StdEncoding.EncodeToString(data)
mutation := `{ experimentID: \"` + wfEvent.WorkflowID + `\", experimentRunID: \"` + wfEvent.UID + `\", revisionID:\"` + wfEvent.RevisionID + `\", completed: ` + completed + `, experimentName:\"` + wfEvent.Name + `\", infraID: ` + infraID + `, updatedBy:\"` + wfEvent.UpdatedBy + `\", executionData:\"` + executionData + `\"}`
if wfEvent.NotifyID != nil {
mutation = `{ experimentID: \"` + wfEvent.WorkflowID + `\", experimentRunID: \"` + wfEvent.UID + `\", revisionID:\"` + wfEvent.RevisionID + `\", notifyID:\"` + *wfEvent.NotifyID + `\", completed: ` + completed + `, experimentName:\"` + wfEvent.Name + `\", infraID: ` + infraID + `, updatedBy:\"` + wfEvent.UpdatedBy + `\", executionData:\"` + executionData + `\"}`
}
var payload = []byte(`{"query":"mutation { chaosExperimentRun(request:` + mutation + ` )}"}`)
return payload, nil
}
package events
import (
"errors"
"fmt"
"os"
"strconv"
"time"
"subscriber/pkg/types"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions"
litmusV1alpha1 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned/typed/litmuschaos/v1alpha1"
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)
// 0 means no resync
const (
resyncPeriod time.Duration = 0
)
var eventMap map[string]types.WorkflowEvent
func init() {
eventMap = make(map[string]types.WorkflowEvent)
}
var (
InfraScope = os.Getenv("INFRA_SCOPE")
InfraNamespace = os.Getenv("INFRA_NAMESPACE")
InfraID = os.Getenv("INFRA_ID")
)
// WorkflowEventWatcher initializes the Argo Workflow event watcher
func (ev *subscriberEvents) WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
startTime, err := strconv.Atoi(infraData["START_TIME"])
if err != nil {
logrus.WithError(err).Fatal("Failed to parse START_TIME")
}
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("Could not get kube config")
}
// ClientSet to create Informer
clientSet, err := versioned.NewForConfig(cfg)
if err != nil {
logrus.WithError(err).Fatal("Could not generate dynamic client for config")
}
// Create a factory object to watch workflows depending on default scope
f := externalversions.NewSharedInformerFactoryWithOptions(clientSet, resyncPeriod,
externalversions.WithTweakListOptions(func(list *v1.ListOptions) {
list.LabelSelector = fmt.Sprintf("infra_id=%s,workflows.argoproj.io/controller-instanceid=%s", InfraID, InfraID)
}))
informer := f.Argoproj().V1alpha1().Workflows().Informer()
if InfraScope == "namespace" {
f = externalversions.NewSharedInformerFactoryWithOptions(clientSet, resyncPeriod, externalversions.WithNamespace(InfraNamespace),
externalversions.WithTweakListOptions(func(list *v1.ListOptions) {
list.LabelSelector = fmt.Sprintf("infra_id=%s,workflows.argoproj.io/controller-instanceid=%s", InfraID, InfraID)
}))
informer = f.Argoproj().V1alpha1().Workflows().Informer()
// Start Event Watch
}
go ev.startWatchWorkflow(stopCh, informer, stream, int64(startTime))
}
// handles the different events events - add, update and delete
func (ev *subscriberEvents) startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
workflowObj := obj.(*v1alpha1.Workflow)
workflow, err := ev.WorkflowEventHandler(workflowObj, "ADD", startTime)
if err != nil {
logrus.Error(err)
}
//stream
stream <- workflow
},
UpdateFunc: func(oldObj, obj interface{}) {
workflowObj := obj.(*v1alpha1.Workflow)
workflow, err := ev.WorkflowEventHandler(workflowObj, "UPDATE", startTime)
if err != nil {
logrus.Error(err)
}
//stream
stream <- workflow
},
}
s.AddEventHandler(handlers)
s.Run(stopCh)
}
// WorkflowEventHandler is responsible for extracting the required data from the event and streaming
func (ev *subscriberEvents) WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) {
if workflowObj.Labels["workflow_id"] == "" {
logrus.WithFields(map[string]interface{}{
"uid": string(workflowObj.ObjectMeta.UID),
"wf_id": workflowObj.Labels["workflow_id"],
"instance_id": workflowObj.Labels["workflows.argoproj.io/controller-instanceid"],
}).Printf("WORKFLOW RUN IGNORED [INVALID METADATA]")
return types.WorkflowEvent{}, nil
}
if workflowObj.ObjectMeta.CreationTimestamp.UnixMilli() < startTime {
return types.WorkflowEvent{}, errors.New("startTime of subscriber is greater than experiment creation timestamp")
}
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("Could not get kube config")
}
chaosClient, err := litmusV1alpha1.NewForConfig(cfg)
if err != nil {
return types.WorkflowEvent{}, errors.New("could not get Chaos ClientSet: " + err.Error())
}
nodes := make(map[string]types.Node)
logrus.Info("Workflow RUN_ID: ", workflowObj.UID, " and event type: ", eventType)
for _, nodeStatus := range workflowObj.Status.Nodes {
var (
nodeType = string(nodeStatus.Type)
cd *types.ChaosData = nil
)
// considering chaos events has only 1 artifact with manifest as raw data
if nodeStatus.Type == "Pod" && nodeStatus.Inputs != nil && len(nodeStatus.Inputs.Artifacts) == 1 && nodeStatus.Inputs.Artifacts[0].Raw != nil {
//extracts chaos data
nodeType, cd, err = ev.CheckChaosData(nodeStatus, workflowObj.ObjectMeta.Namespace, chaosClient)
if err != nil {
logrus.WithError(err).Print("Failed to parse ChaosEngine CRD")
}
}
details := types.Node{
Name: nodeStatus.DisplayName,
Phase: string(nodeStatus.Phase),
Type: nodeType,
StartedAt: StrConvTime(nodeStatus.StartedAt.Unix()),
FinishedAt: StrConvTime(nodeStatus.FinishedAt.Unix()),
Children: nodeStatus.Children,
ChaosExp: cd,
Message: nodeStatus.Message,
}
if nodeType == "ChaosEngine" && cd != nil {
details.Phase = cd.ExperimentStatus
}
nodes[nodeStatus.ID] = details
}
status := updateWorkflowStatus(workflowObj.Status.Phase)
finishedTime := StrConvTime(workflowObj.Status.FinishedAt.Unix())
if workflowObj.Spec.Shutdown.Enabled() {
status = "Stopped"
finishedTime = StrConvTime(time.Now().Unix())
nodes[workflowObj.Name] = types.Node{
Name: workflowObj.Name,
Phase: "Stopped",
StartedAt: StrConvTime(workflowObj.CreationTimestamp.Unix()),
FinishedAt: finishedTime,
Children: nodes[workflowObj.Name].Children,
Type: nodes[workflowObj.Name].Type,
}
}
var notifyID *string = nil
if id, ok := workflowObj.Labels["notify_id"]; ok {
notifyID = &id
}
workflow := types.WorkflowEvent{
WorkflowType: "events",
WorkflowID: workflowObj.Labels["workflow_id"],
EventType: eventType,
RevisionID: workflowObj.Labels["revision_id"],
NotifyID: notifyID,
UID: string(workflowObj.ObjectMeta.UID),
Namespace: workflowObj.ObjectMeta.Namespace,
Name: workflowObj.ObjectMeta.Name,
CreationTimestamp: StrConvTime(workflowObj.ObjectMeta.CreationTimestamp.Unix()),
Phase: status,
Message: workflowObj.Status.Message,
StartedAt: StrConvTime(workflowObj.Status.StartedAt.Unix()),
FinishedAt: finishedTime,
Nodes: nodes,
UpdatedBy: workflowObj.Labels["updated_by"],
}
return workflow, nil
}
// SendWorkflowUpdates generates graphql mutation to send events updates to graphql server
func (ev *subscriberEvents) SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error) {
if wfEvent, ok := eventMap[event.UID]; ok {
for key, node := range wfEvent.Nodes {
if node.Type == "ChaosEngine" && node.ChaosExp != nil && event.Nodes[key].ChaosExp == nil {
nodeData := event.Nodes[key]
nodeData.ChaosExp = node.ChaosExp
nodeData.Phase = node.Phase
nodeData.Message = node.Message
event.Nodes[key] = nodeData
}
if event.Phase == "Stopped" {
if event.Nodes[key].Phase == "Running" || event.Nodes[key].Phase == "Pending" {
nodeData := event.Nodes[key]
nodeData.Phase = "Stopped"
event.Nodes[key] = nodeData
nodeData.FinishedAt = event.FinishedAt
}
}
}
}
// Setting up the experiment status
// based on different probes results
// present in the experiment
status, err := getExperimentStatus(event)
if err != nil {
logrus.WithError(err)
}
event.Phase = status
eventMap[event.UID] = event
// generate graphql payload
payload, err := ev.GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "false", event)
if err != nil {
return "", errors.New("Error while generating graphql payload from the workflow event" + err.Error())
}
if event.FinishedAt != "" {
payload, err = ev.GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "true", event)
delete(eventMap, event.UID)
}
body, err := ev.gqlSubscriberServer.SendRequest(infraData["SERVER_ADDR"], payload)
if err != nil {
return "", err
}
return body, nil
}
func (ev *subscriberEvents) WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent) {
// listen on the channel for streaming event updates
for eventData := range event {
response, err := ev.SendWorkflowUpdates(infraData, eventData)
if err != nil {
logrus.Print(err.Error())
}
logrus.Print("Response from the server: ", response)
}
}
func updateWorkflowStatus(status v1alpha1.WorkflowPhase) string {
switch status {
case v1alpha1.WorkflowRunning:
return "Running"
case v1alpha1.WorkflowSucceeded:
return "Completed"
case v1alpha1.WorkflowFailed:
return "Completed"
case v1alpha1.WorkflowPending:
return "Pending"
case v1alpha1.WorkflowError:
return "Error"
default:
return "Pending"
}
}
// getExperimentStatus is used to fetch the final experiment status
// based on the fault/probe status
func getExperimentStatus(experiment types.WorkflowEvent) (string, error) {
var (
errorCount = 0
completedWithProbeFailureCount = 0
status = experiment.Phase
)
// Once the workflow is completed, and it is not stopped,
// we will fetch the data based on the different
// node statuses(which are coming from the probe status
// of these faults)
if status == "" {
return "", errors.New("status is invalid")
}
if status == "Stopped" || experiment.FinishedAt == "" {
return status, nil
}
for _, node := range experiment.Nodes {
if node.Type == "ChaosEngine" && node.ChaosExp == nil {
errorCount++
continue
}
switch node.Phase {
case string(types.FaultCompletedWithProbeFailure):
completedWithProbeFailureCount++
case string(types.Error):
errorCount++
}
}
// For multiple faults, if one of the fault
// errors out, priority is given to the error
// status and then the remaining status
if errorCount > 0 {
status = string(types.Error)
} else if completedWithProbeFailureCount > 0 {
status = string(types.FaultCompletedWithProbeFailure)
}
return status, nil
}