1
#include "source/extensions/common/aws/aws_cluster_manager.h"
2

            
3
#include "envoy/server/factory_context.h"
4

            
5
#include "source/extensions/common/aws/utility.h"
6

            
7
namespace Envoy {
8
namespace Extensions {
9
namespace Common {
10
namespace Aws {
11

            
12
AwsClusterManagerImpl::AwsClusterManagerImpl(Server::Configuration::ServerFactoryContext& context)
13
111
    : context_(context) {
14

            
15
  // If we are still initializing, defer cluster creation using an init target
16
111
  if (context_.initManager().state() == Envoy::Init::Manager::State::Initialized) {
17
2
    queue_clusters_.exchange(false);
18
2
    cm_handle_ = context_.clusterManager().addThreadLocalClusterUpdateCallbacks(*this);
19
109
  } else {
20
109
    init_target_ = std::make_unique<Init::TargetImpl>("aws_cluster_manager", [this]() -> void {
21
29
      queue_clusters_.exchange(false);
22
29
      cm_handle_ = context_.clusterManager().addThreadLocalClusterUpdateCallbacks(*this);
23
29
      createQueuedClusters();
24

            
25
29
      init_target_->ready();
26
29
      init_target_.reset();
27
29
    });
28
109
    context_.initManager().add(*init_target_);
29
109
  }
30
111
};
31

            
32
absl::StatusOr<AwsManagedClusterUpdateCallbacksHandlePtr>
33
AwsClusterManagerImpl::addManagedClusterUpdateCallbacks(absl::string_view cluster_name,
34
117
                                                        AwsManagedClusterUpdateCallbacks& cb) {
35
117
  auto it = managed_clusters_.find(cluster_name);
36
117
  ENVOY_LOG(debug, "Adding callback for cluster {}", cluster_name);
37
117
  if (it == managed_clusters_.end()) {
38
1
    return absl::InvalidArgumentError("Cluster not found");
39
1
  }
40
116
  auto managed_cluster = it->second.get();
41
  // If the cluster is already alive, signal the callback immediately to start retrieving
42
  // credentials
43
116
  if (!managed_cluster->is_creating_) {
44
10
    ENVOY_LOG(debug, "Managed cluster {} is ready immediately, calling callback", cluster_name);
45
10
    cb.onClusterAddOrUpdate();
46
10
    return absl::AlreadyExistsError("Cluster already online");
47
10
  }
48
106
  return std::make_unique<AwsManagedClusterUpdateCallbacksHandle>(
49
106
      context_, cb, managed_cluster->update_callbacks_);
50
116
}
51

            
52
void AwsClusterManagerImpl::onClusterAddOrUpdate(absl::string_view cluster_name,
53
43
                                                 Upstream::ThreadLocalClusterCommand&) {
54
  // Mark our cluster as ready for use
55
43
  auto it = managed_clusters_.find(cluster_name);
56
43
  if (it != managed_clusters_.end()) {
57
42
    auto managed_cluster = it->second.get();
58
42
    managed_cluster->is_creating_.store(false);
59
42
    for (auto& cb : managed_cluster->update_callbacks_) {
60
39
      ENVOY_LOG(debug, "Managed cluster {} is ready, calling callback", cluster_name);
61
39
      cb->onClusterAddOrUpdate();
62
39
    }
63
42
  }
64
43
}
65

            
66
// No removal handler required, as we are using avoid_cds_removal flag
67
2
void AwsClusterManagerImpl::onClusterRemoval(const std::string&) {};
68

            
69
29
void AwsClusterManagerImpl::createQueuedClusters() {
70
29
  std::vector<std::string> failed_clusters;
71
38
  for (const auto& it : managed_clusters_) {
72
32
    auto cluster_name = it.first;
73
32
    auto cluster_type = it.second->cluster_type_;
74
32
    auto uri = it.second->uri_;
75
32
    auto cluster = Utility::createInternalClusterStatic(cluster_name, cluster_type, uri);
76
32
    auto status = context_.clusterManager().addOrUpdateCluster(cluster, "", true);
77
32
    if (!status.ok()) {
78
1
      ENVOY_LOG(debug, "Failed to add cluster {} to cluster manager: {}", cluster_name,
79
1
                status.status().ToString());
80
1
      failed_clusters.push_back(cluster_name);
81
1
    }
82
32
  }
83
29
  for (const auto& cluster_name : failed_clusters) {
84
1
    managed_clusters_.erase(cluster_name);
85
1
  }
86
29
}
87

            
88
absl::Status AwsClusterManagerImpl::addManagedCluster(
89
    absl::string_view cluster_name,
90
119
    const envoy::config::cluster::v3::Cluster::DiscoveryType cluster_type, absl::string_view uri) {
91

            
92
119
  auto it = managed_clusters_.find(cluster_name);
93
119
  if (it == managed_clusters_.end()) {
94
105
    auto new_cluster = std::make_unique<CredentialsProviderCluster>(cluster_type, std::string(uri));
95
105
    auto inserted = managed_clusters_.insert({std::string(cluster_name), std::move(new_cluster)});
96
105
    if (inserted.second) {
97
105
      it = inserted.first;
98
105
      it->second->is_creating_.store(true);
99
105
      ENVOY_LOG(debug, "Added cluster {} to list, cluster list len {}", cluster_name,
100
105
                managed_clusters_.size());
101

            
102
105
      auto cluster = Utility::createInternalClusterStatic(cluster_name, cluster_type, uri);
103
105
      if (!queue_clusters_) {
104
10
        auto status = context_.clusterManager().addOrUpdateCluster(cluster, "", true);
105
10
        if (!status.ok()) {
106
1
          ENVOY_LOG(debug, "Failed to add cluster {} to cluster manager: {}", cluster_name,
107
1
                    status.status().ToString());
108
1
          managed_clusters_.erase(cluster_name);
109
1
          return status.status();
110
1
        }
111
10
      }
112
105
    }
113
104
    return absl::OkStatus();
114
106
  } else {
115
14
    ENVOY_LOG(debug, "Cluster {} already exists, not readding", cluster_name);
116
14
    return absl::AlreadyExistsError("Cluster already exists");
117
14
  }
118
119
}
119

            
120
absl::StatusOr<std::string>
121
21
AwsClusterManagerImpl::getUriFromClusterName(absl::string_view cluster_name) {
122
21
  auto it = managed_clusters_.find(cluster_name);
123
21
  if (it == managed_clusters_.end()) {
124
3
    return absl::InvalidArgumentError("Cluster not found");
125
3
  }
126
18
  return it->second->uri_;
127
21
}
128

            
129
} // namespace Aws
130
} // namespace Common
131
} // namespace Extensions
132
} // namespace Envoy