1
#include "source/extensions/common/aws/metadata_credentials_provider_base.h"
2

            
3
#include <chrono>
4

            
5
#include "envoy/server/factory_context.h"
6

            
7
namespace Envoy {
8
namespace Extensions {
9
namespace Common {
10
namespace Aws {
11

            
12
MetadataCredentialsProviderBase::MetadataCredentialsProviderBase(
13
    Server::Configuration::ServerFactoryContext& context, AwsClusterManagerPtr aws_cluster_manager,
14
    absl::string_view cluster_name, CreateMetadataFetcherCb create_metadata_fetcher_cb,
15
    MetadataFetcher::MetadataReceiver::RefreshState refresh_state,
16
    std::chrono::seconds initialization_timer)
17
197
    : context_(context), create_metadata_fetcher_cb_(create_metadata_fetcher_cb),
18
197
      cluster_name_(cluster_name), cache_duration_(getCacheDuration()),
19
197
      refresh_state_(refresh_state), initialization_timer_(initialization_timer),
20
197
      aws_cluster_manager_(aws_cluster_manager) {
21

            
22
  // Set up metadata credentials statistics
23
197
  scope_ = context_.api().rootScope().createScope(
24
197
      fmt::format("aws.metadata_credentials_provider.{}.", cluster_name_));
25
197
  stats_ = std::make_shared<MetadataCredentialsProviderStats>(MetadataCredentialsProviderStats{
26
197
      ALL_METADATACREDENTIALSPROVIDER_STATS(POOL_COUNTER(*scope_), POOL_GAUGE(*scope_))});
27
197
  stats_->metadata_refresh_state_.set(uint64_t(refresh_state_));
28

            
29
197
  tls_slot_ =
30
197
      ThreadLocal::TypedSlot<ThreadLocalCredentialsCache>::makeUnique(context_.threadLocal());
31

            
32
197
  tls_slot_->set(
33
239
      [&](Event::Dispatcher&) { return std::make_shared<ThreadLocalCredentialsCache>(); });
34
197
};
35

            
36
197
MetadataCredentialsProviderBase::~MetadataCredentialsProviderBase() {
37
197
  cancel_credentials_update_callback_();
38
197
  if (metadata_fetcher_) {
39
103
    metadata_fetcher_->cancel();
40
103
  }
41
197
}
42

            
43
139
void MetadataCredentialsProviderBase::onClusterAddOrUpdate() {
44
139
  ENVOY_LOG(debug, "Received callback from aws cluster manager for cluster {}", cluster_name_);
45
139
  if (!cache_duration_timer_) {
46
135
    std::weak_ptr<MetadataCredentialsProviderStats> weak_stats = stats_;
47
135
    std::weak_ptr<MetadataCredentialsProviderBase> weak_self = shared_from_this();
48
135
    cache_duration_timer_ =
49
140
        context_.mainThreadDispatcher().createTimer([weak_stats, weak_self]() -> void {
50
138
          if (auto stats = weak_stats.lock()) {
51
137
            stats->credential_refreshes_performed_.inc();
52
137
          }
53
138
          if (auto self = weak_self.lock()) {
54
138
            self->refresh();
55
138
          }
56
138
        });
57
135
  }
58
139
  if (!cache_duration_timer_->enabled()) {
59
47
    cache_duration_timer_->enableTimer(std::chrono::milliseconds(1));
60
47
  }
61
139
}
62

            
63
90
void MetadataCredentialsProviderBase::credentialsRetrievalError() {
64
  // Credential retrieval failed, so set blank (anonymous) credentials
65
90
  stats_->credential_refreshes_failed_.inc();
66
90
  ENVOY_LOG(debug, "Error retrieving credentials, settings anonymous credentials");
67
90
  setCredentialsToAllThreads(std::make_unique<Credentials>());
68
90
  handleFetchDone();
69
90
}
70

            
71
4
bool MetadataCredentialsProviderBase::credentialsPending() { return credentials_pending_; }
72

            
73
83
Credentials MetadataCredentialsProviderBase::getCredentials() {
74
83
  return *(*tls_slot_)->credentials_.get();
75
83
}
76

            
77
// getCacheDuration will return a duration between 3566 and 3595 seconds, IE close to 1 hour with
78
// jitter.
79
250
std::chrono::seconds MetadataCredentialsProviderBase::getCacheDuration() {
80
250
  const auto jitter =
81
250
      std::chrono::seconds(context_.api().randomGenerator().random() % MAX_CACHE_JITTER.count());
82
250
  return std::chrono::seconds(REFRESH_INTERVAL - REFRESH_GRACE_PERIOD - jitter);
83
250
}
84

            
85
130
void MetadataCredentialsProviderBase::handleFetchDone() {
86
130
  if (cache_duration_timer_ && !cache_duration_timer_->enabled()) {
87
    // Receiver state handles the initial credential refresh scenario. If for some reason we are
88
    // unable to perform credential refresh after cluster initialization has completed, we use a
89
    // short timer to keep retrying. Once successful, we fall back to the normal cache duration
90
    // or whatever expiration is provided in the credential payload
91
105
    if (refresh_state_ == MetadataFetcher::MetadataReceiver::RefreshState::FirstRefresh) {
92
40
      cache_duration_timer_->enableTimer(initialization_timer_);
93
40
      ENVOY_LOG(debug, "Metadata fetcher initialization failed, retrying in {}",
94
40
                std::chrono::seconds(initialization_timer_.count()));
95
      // Timer begins at 2 seconds and doubles each time, to a maximum of 32 seconds. This avoids
96
      // excessive retries against STS or instance metadata service
97
40
      if (initialization_timer_ < std::chrono::seconds(32)) {
98
38
        initialization_timer_ = initialization_timer_ * 2;
99
38
      }
100
91
    } else {
101
      // If our returned token had an expiration time, use that to set the cache duration
102
65
      const auto now = context_.api().timeSource().systemTime();
103
65
      if (expiration_time_.has_value() && (expiration_time_.value() > now)) {
104
12
        auto time_until_expiration = expiration_time_.value() - now;
105
12
        auto grace_period =
106
12
            std::chrono::duration_cast<std::chrono::system_clock::duration>(REFRESH_GRACE_PERIOD);
107

            
108
        // Subtract grace period, but ensure we don't go negative
109
12
        if (time_until_expiration > grace_period) {
110
11
          cache_duration_ = std::chrono::duration_cast<std::chrono::seconds>(time_until_expiration -
111
11
                                                                             grace_period);
112
11
        } else {
113
1
          ENVOY_LOG(warn,
114
1
                    "Credential expiration time is within grace period {} seconds, refreshing now. "
115
1
                    "Minimum expiration time should be 900 seconds (15 minutes).",
116
1
                    REFRESH_GRACE_PERIOD.count());
117
1
          cache_duration_ = std::chrono::seconds(1);
118
1
        }
119

            
120
12
        ENVOY_LOG(debug,
121
12
                  "Metadata fetcher setting credential refresh to {}, based on "
122
12
                  "credential expiration with grace period",
123
12
                  std::chrono::seconds(cache_duration_.count()));
124
54
      } else {
125
53
        cache_duration_ = getCacheDuration();
126
53
        ENVOY_LOG(debug,
127
53
                  "Metadata fetcher setting credential refresh to {}, based on default expiration",
128
53
                  std::chrono::seconds(cache_duration_.count()));
129
53
      }
130
65
      cache_duration_timer_->enableTimer(
131
65
          std::chrono::duration_cast<std::chrono::milliseconds>(cache_duration_));
132
65
    }
133
105
  }
134
130
}
135

            
136
void MetadataCredentialsProviderBase::setCredentialsToAllThreads(
137
126
    CredentialsConstUniquePtr&& creds) {
138

            
139
126
  ENVOY_LOG(debug, "{}: Setting credentials to all threads", this->providerName());
140

            
141
126
  CredentialsConstSharedPtr shared_credentials = std::move(creds);
142
126
  if (tls_slot_ && !tls_slot_->isShutdown()) {
143
114
    tls_slot_->runOnAllThreads(
144
114
        /* Set the credentials */ [shared_credentials](
145
114
                                      OptRef<ThreadLocalCredentialsCache>
146
117
                                          obj) { obj->credentials_ = shared_credentials; },
147
        /* Notify waiting signers on completion of credential setting above */
148
114
        CancelWrapper::cancelWrapped(
149
114
            [this]() {
150
112
              credentials_pending_.store(false);
151
112
              std::list<std::weak_ptr<CredentialSubscriberCallbacks>> subscribers_copy;
152
112
              {
153
112
                Thread::LockGuard guard(mu_);
154
112
                subscribers_copy = credentials_subscribers_;
155
112
              }
156
112
              for (auto& weak_cb : subscribers_copy) {
157
7
                if (auto cb = weak_cb.lock()) {
158
6
                  ENVOY_LOG(debug, "Notifying subscriber of credential update");
159
6
                  cb->onCredentialUpdate();
160
6
                }
161
7
              }
162
112
            },
163
114
            &cancel_credentials_update_callback_));
164
114
  }
165
126
}
166

            
167
CredentialSubscriberCallbacksHandlePtr
168
MetadataCredentialsProviderBase::subscribeToCredentialUpdates(
169
88
    CredentialSubscriberCallbacksSharedPtr cs) {
170
88
  Thread::LockGuard guard(mu_);
171
88
  return std::make_unique<CredentialSubscriberCallbacksHandle>(cs, credentials_subscribers_);
172
88
}
173

            
174
} // namespace Aws
175
} // namespace Common
176
} // namespace Extensions
177
} // namespace Envoy