1
#include "source/extensions/common/aws/metadata_fetcher.h"
2

            
3
#include <memory>
4

            
5
#include "source/common/common/enum_to_int.h"
6
#include "source/common/http/message_impl.h"
7
#include "source/common/http/utility.h"
8

            
9
namespace Envoy {
10
namespace Extensions {
11
namespace Common {
12
namespace Aws {
13

            
14
namespace {
15

            
16
class MetadataFetcherImpl : public MetadataFetcher,
17
                            public Logger::Loggable<Logger::Id::aws>,
18
                            public Http::AsyncClient::Callbacks,
19
                            public std::enable_shared_from_this<MetadataFetcherImpl> {
20

            
21
public:
22
  MetadataFetcherImpl(Upstream::ClusterManager& cm, absl::string_view cluster_name)
23
30
      : cm_(cm), cluster_name_(std::string(cluster_name)) {}
24

            
25
30
  ~MetadataFetcherImpl() override {
26
30
    if (request_.has_value() && !complete_.load()) {
27
      if (!cm_.isShutdown()) {
28
        const auto thread_local_cluster = cm_.getThreadLocalCluster(cluster_name_);
29
        if (thread_local_cluster != nullptr) {
30
          thread_local_cluster->httpAsyncClient().dispatcher().post(
31
              [self_ref = self_ref_]() { self_ref->cancel(); });
32
          return;
33
        }
34
      }
35
    }
36
30
    receiver_.reset();
37
30
    complete_.store(true);
38
30
    request_.reset();
39
30
    self_ref_.reset();
40
30
  }
41

            
42
21
  void cancel() override {
43
21
    if (request_.has_value() && !complete_.load()) {
44
1
      request_->cancel();
45
1
      ENVOY_LOG(debug, "fetch AWS Metadata [cluster = {}]: cancelled", cluster_name_);
46
1
    }
47

            
48
21
    receiver_.reset();
49
21
    complete_.store(true);
50
21
    request_.reset();
51
21
    self_ref_.reset();
52
21
  }
53

            
54
15
  absl::string_view failureToString(MetadataFetcher::MetadataReceiver::Failure reason) override {
55
15
    switch (reason) {
56
12
    case MetadataFetcher::MetadataReceiver::Failure::Network:
57
12
      return "Network";
58
1
    case MetadataFetcher::MetadataReceiver::Failure::InvalidMetadata:
59
1
      return "InvalidMetadata";
60
1
    case MetadataFetcher::MetadataReceiver::Failure::MissingConfig:
61
1
      return "MissingConfig";
62
1
    default:
63
1
      return "";
64
15
    }
65
15
  }
66

            
67
  void fetch(Http::RequestMessage& message, Tracing::Span& parent_span,
68
31
             MetadataFetcher::MetadataReceiver& receiver) override {
69
31
    ASSERT(!request_);
70
31
    complete_.store(false);
71
31
    receiver_ = makeOptRef(receiver);
72

            
73
    // Stop processing if we are shutting down
74
31
    if (cm_.isShutdown()) {
75
5
      return;
76
5
    }
77

            
78
26
    const auto thread_local_cluster = cm_.getThreadLocalCluster(cluster_name_);
79
26
    if (thread_local_cluster == nullptr) {
80
1
      ENVOY_LOG(error, "{} AWS Metadata failed: [cluster = {}] not found", __func__, cluster_name_);
81
1
      complete_.store(true);
82
1
      receiver_->onMetadataError(MetadataFetcher::MetadataReceiver::Failure::MissingConfig);
83
1
      request_.reset();
84
1
      self_ref_.reset();
85
1
      return;
86
1
    }
87

            
88
25
    constexpr uint64_t MAX_RETRIES = 3;
89
25
    constexpr uint64_t RETRY_DELAY = 1000;
90
25
    constexpr uint64_t TIMEOUT = 5 * 1000;
91

            
92
25
    const auto host_attributes = Http::Utility::parseAuthority(message.headers().getHostValue());
93
25
    const auto host = host_attributes.host_;
94
25
    const auto path = message.headers().getPathValue();
95
25
    const auto scheme = message.headers().getSchemeValue();
96
25
    const auto method = message.headers().getMethodValue();
97

            
98
25
    const size_t query_offset = path.find('?');
99
    // Sanitize the path before logging.
100
    // However, the route debug log will still display the entire path.
101
    // So safely store the Envoy logs at debug level.
102
25
    const absl::string_view sanitized_path =
103
25
        query_offset != absl::string_view::npos ? path.substr(0, query_offset) : path;
104
25
    ENVOY_LOG(debug, "fetch AWS Metadata from the cluster {} at [uri = {}]", cluster_name_,
105
25
              fmt::format("{}://{}{}", scheme, host, sanitized_path));
106

            
107
25
    Http::RequestHeaderMapPtr headersPtr =
108
25
        Envoy::Http::createHeaderMap<Envoy::Http::RequestHeaderMapImpl>(
109
25
            {{Envoy::Http::Headers::get().Method, std::string(method)},
110
25
             {Envoy::Http::Headers::get().Host, std::string(host)},
111
25
             {Envoy::Http::Headers::get().Scheme, std::string(scheme)},
112
25
             {Envoy::Http::Headers::get().Path, std::string(path)}});
113

            
114
    // Copy the remaining headers.
115
25
    message.headers().iterate(
116
93
        [&headersPtr](const Http::HeaderEntry& entry) -> Http::HeaderMap::Iterate {
117
          // Skip pseudo-headers
118
89
          if (!entry.key().getStringView().empty() && entry.key().getStringView()[0] == ':') {
119
65
            return Http::HeaderMap::Iterate::Continue;
120
65
          }
121
24
          headersPtr->addCopy(Http::LowerCaseString(entry.key().getStringView()),
122
24
                              entry.value().getStringView());
123
24
          return Http::HeaderMap::Iterate::Continue;
124
89
        });
125

            
126
25
    auto messagePtr = std::make_unique<Envoy::Http::RequestMessageImpl>(std::move(headersPtr));
127
    // Add body if it exists, used when IAM Roles Anywhere exchanges X509 credentials for temporary
128
    // credentials
129
25
    if (message.body().length()) {
130
2
      messagePtr->body().add(message.body());
131
2
    }
132
25
    auto options = Http::AsyncClient::RequestOptions()
133
25
                       .setTimeout(std::chrono::milliseconds(TIMEOUT))
134
25
                       .setParentSpan(parent_span)
135
25
                       .setSendXff(false)
136
25
                       .setChildSpanName("AWS Metadata Fetch");
137

            
138
25
    envoy::config::route::v3::RetryPolicy route_retry_policy;
139
25
    route_retry_policy.mutable_num_retries()->set_value(MAX_RETRIES);
140
25
    route_retry_policy.mutable_per_try_timeout()->CopyFrom(
141
25
        Protobuf::util::TimeUtil::MillisecondsToDuration(TIMEOUT));
142
25
    route_retry_policy.mutable_per_try_idle_timeout()->CopyFrom(
143
25
        Protobuf::util::TimeUtil::MillisecondsToDuration(RETRY_DELAY));
144
25
    route_retry_policy.set_retry_on("5xx,gateway-error,connect-failure,reset,refused-stream");
145

            
146
25
    options.setRetryPolicy(route_retry_policy);
147
25
    options.setBufferBodyForRetry(true);
148
    // Keep object alive during async operation
149
25
    self_ref_ = shared_from_this();
150
25
    request_ = makeOptRefFromPtr(
151
25
        thread_local_cluster->httpAsyncClient().send(std::move(messagePtr), *this, options));
152

            
153
25
    if (!request_) {
154
1
      self_ref_.reset();
155
1
    }
156
25
  }
157

            
158
  // HTTP async receive method on success.
159
10
  void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& response) override {
160
    // Capture self-reference immediately to keep object alive during method execution
161
10
    auto self_ref = std::move(self_ref_);
162

            
163
    // Safe early exit if object is being destroyed
164
10
    if (!receiver_ || complete_.load()) {
165
      return;
166
    }
167
10
    complete_.store(true);
168
10
    const uint64_t status_code = Http::Utility::getResponseStatus(response->headers());
169
10
    if (status_code == enumToInt(Http::Code::OK) ||
170
10
        (status_code == enumToInt(Http::Code::Created))) {
171
4
      ENVOY_LOG(debug, "{}: fetch AWS Metadata [cluster = {}]: success", __func__, cluster_name_);
172
4
      if (response->body().length() != 0) {
173
3
        const auto body = response->bodyAsString();
174
3
        receiver_->onMetadataSuccess(std::move(body));
175
3
      } else {
176
1
        ENVOY_LOG(debug, "{}: fetch AWS Metadata [cluster = {}]: body is empty", __func__,
177
1
                  cluster_name_);
178
1
        receiver_->onMetadataError(MetadataFetcher::MetadataReceiver::Failure::InvalidMetadata);
179
1
      }
180
8
    } else {
181
6
      if (response->body().length() != 0) {
182
5
        ENVOY_LOG(debug, "{}: fetch AWS Metadata [cluster = {}]: response status code {}, body: {}",
183
5
                  __func__, cluster_name_, status_code, response->bodyAsString());
184
5
      } else {
185
1
        ENVOY_LOG(debug,
186
1
                  "{}: fetch AWS Metadata [cluster = {}]: response status code {}, body is empty",
187
1
                  __func__, cluster_name_, status_code);
188
1
      }
189
6
      receiver_->onMetadataError(MetadataFetcher::MetadataReceiver::Failure::Network);
190
6
    }
191
10
    request_.reset();
192
10
  }
193

            
194
  // HTTP async receive method on failure.
195
  void onFailure(const Http::AsyncClient::Request&,
196
13
                 Http::AsyncClient::FailureReason reason) override {
197
    // Capture self-reference immediately to keep object alive during method execution
198
13
    auto self_ref = std::move(self_ref_);
199

            
200
    // Safe early exit if object is being destroyed
201
13
    if (!receiver_ || complete_.load()) {
202
      return;
203
    }
204
13
    ENVOY_LOG(debug, "{}: fetch AWS Metadata [cluster = {}]: network error {}", __func__,
205
13
              cluster_name_, enumToInt(reason));
206
13
    complete_.store(true);
207
13
    receiver_->onMetadataError(MetadataFetcher::MetadataReceiver::Failure::Network);
208
13
    request_.reset();
209
13
  }
210

            
211
  // TODO(suniltheta): Add metadata fetch status into the span like it is done on ext_authz filter.
212
15
  void onBeforeFinalizeUpstreamSpan(Tracing::Span&, const Http::ResponseHeaderMap*) override {}
213

            
214
private:
215
  std::atomic<bool> complete_{false};
216
  Upstream::ClusterManager& cm_;
217
  const std::string cluster_name_;
218
  OptRef<MetadataFetcher::MetadataReceiver> receiver_;
219
  OptRef<Http::AsyncClient::Request> request_;
220
  std::shared_ptr<MetadataFetcherImpl> self_ref_; // Keep self alive during async ops
221

            
222
  void reset() {
223
    request_.reset();
224
    self_ref_.reset();
225
  }
226
};
227
} // namespace
228

            
229
// TODO(nbaws): Change api to return shared_ptr and remove wrapper
230
class MetadataFetcherWrapper : public MetadataFetcher {
231
public:
232
  explicit MetadataFetcherWrapper(std::shared_ptr<MetadataFetcherImpl> impl)
233
30
      : impl_(std::move(impl)) {}
234

            
235
21
  void cancel() override { impl_->cancel(); }
236
15
  absl::string_view failureToString(MetadataReceiver::Failure reason) override {
237
15
    return impl_->failureToString(reason);
238
15
  }
239
  void fetch(Http::RequestMessage& message, Tracing::Span& parent_span,
240
31
             MetadataReceiver& receiver) override {
241
31
    impl_->fetch(message, parent_span, receiver);
242
31
  }
243

            
244
private:
245
  std::shared_ptr<MetadataFetcherImpl> impl_;
246
};
247

            
248
MetadataFetcherPtr MetadataFetcher::create(Upstream::ClusterManager& cm,
249
30
                                           absl::string_view cluster_name) {
250
30
  auto impl = std::make_shared<MetadataFetcherImpl>(cm, cluster_name);
251
30
  return std::make_unique<MetadataFetcherWrapper>(std::move(impl));
252
30
}
253
} // namespace Aws
254
} // namespace Common
255
} // namespace Extensions
256
} // namespace Envoy