1
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
2

            
3
#include <chrono>
4

            
5
#include "envoy/config/subscription.h"
6

            
7
#include "source/common/common/assert.h"
8
#include "source/common/common/logger.h"
9
#include "source/common/common/utility.h"
10
#include "source/common/config/xds_resource.h"
11
#include "source/common/grpc/common.h"
12
#include "source/common/protobuf/protobuf.h"
13
#include "source/common/protobuf/utility.h"
14

            
15
namespace Envoy {
16
namespace Config {
17

            
18
constexpr std::chrono::milliseconds UpdateDurationLogThreshold = std::chrono::milliseconds(50);
19

            
20
GrpcSubscriptionImpl::GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux,
21
                                           SubscriptionCallbacks& callbacks,
22
                                           OpaqueResourceDecoderSharedPtr resource_decoder,
23
                                           SubscriptionStats stats, absl::string_view type_url,
24
                                           Event::Dispatcher& dispatcher,
25
                                           std::chrono::milliseconds init_fetch_timeout,
26
                                           bool is_aggregated, const SubscriptionOptions& options)
27
2627
    : grpc_mux_(grpc_mux), callbacks_(callbacks), resource_decoder_(resource_decoder),
28
2627
      stats_(stats), type_url_(type_url), dispatcher_(dispatcher),
29
2627
      init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated), options_(options) {}
30

            
31
// Config::Subscription
32
2569
void GrpcSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resources) {
33
2569
  if (init_fetch_timeout_.count() > 0) {
34
2539
    init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
35
51
      onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
36
51
    });
37
2539
    init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
38
2539
  }
39

            
40
2569
  watch_ = grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, options_);
41

            
42
  // The attempt stat here is maintained for the purposes of having consistency between ADS and
43
  // gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
44
  // "attempt" for a given xDS API combined by ADS is not really that meaningful.
45
2569
  stats_.update_attempt_.inc();
46

            
47
  // ADS initial request batching relies on the users of the GrpcMux *not* calling start on it,
48
  // whereas non-ADS xDS users must call it themselves.
49
2569
  if (!is_aggregated_) {
50
1182
    grpc_mux_->start();
51
1182
  }
52
2569
}
53

            
54
void GrpcSubscriptionImpl::updateResourceInterest(
55
30
    const absl::flat_hash_set<std::string>& update_to_these_names) {
56
30
  watch_->update(update_to_these_names);
57
30
  stats_.update_attempt_.inc();
58
30
}
59

            
60
void GrpcSubscriptionImpl::requestOnDemandUpdate(
61
52
    const absl::flat_hash_set<std::string>& for_update) {
62
52
  grpc_mux_->requestOnDemandUpdate(type_url_, for_update);
63
52
  stats_.update_attempt_.inc();
64
52
}
65

            
66
// Config::SubscriptionCallbacks
67
absl::Status
68
GrpcSubscriptionImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
69
2197
                                     const std::string& version_info) {
70
2197
  disableInitFetchTimeoutTimer();
71
  // TODO(mattklein123): In the future if we start tracking per-resource versions, we need to
72
  // supply those versions to onConfigUpdate() along with the xDS response ("system")
73
  // version_info. This way, both types of versions can be tracked and exposed for debugging by
74
  // the configuration update targets.
75
2197
  auto start = dispatcher_.timeSource().monotonicTime();
76
2197
  absl::Status status = callbacks_.onConfigUpdate(resources, version_info);
77
2197
  if (!status.ok()) {
78
22
    return status;
79
22
  }
80
2175
  std::chrono::milliseconds update_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
81
2175
      dispatcher_.timeSource().monotonicTime() - start);
82
2175
  stats_.update_success_.inc();
83
2175
  stats_.update_attempt_.inc();
84
2175
  stats_.update_time_.set(DateUtil::nowToMilliseconds(dispatcher_.timeSource()));
85
2175
  stats_.version_.set(HashUtil::xxHash64(version_info));
86
2175
  stats_.version_text_.set(version_info);
87
2175
  stats_.update_duration_.recordValue(update_duration.count());
88
2175
  ENVOY_LOG(debug, "gRPC config for {} accepted with {} resources with version {}", type_url_,
89
2175
            resources.size(), version_info);
90

            
91
2175
  if (update_duration > UpdateDurationLogThreshold) {
92
37
    ENVOY_LOG(debug, "gRPC config update took {} ms! Resources names: {}", update_duration.count(),
93
37
              absl::StrJoin(resources, ",", ResourceNameFormatter()));
94
37
  }
95
2175
  return absl::OkStatus();
96
2197
}
97

            
98
absl::Status GrpcSubscriptionImpl::onConfigUpdate(
99
    const std::vector<Config::DecodedResourceRef>& added_resources,
100
    const Protobuf::RepeatedPtrField<std::string>& removed_resources,
101
1857
    const std::string& system_version_info) {
102
1857
  disableInitFetchTimeoutTimer();
103
1857
  stats_.update_attempt_.inc();
104
1857
  auto start = dispatcher_.timeSource().monotonicTime();
105
1857
  absl::Status status =
106
1857
      callbacks_.onConfigUpdate(added_resources, removed_resources, system_version_info);
107
1857
  if (!status.ok()) {
108
4
    return status;
109
4
  }
110
1853
  std::chrono::milliseconds update_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
111
1853
      dispatcher_.timeSource().monotonicTime() - start);
112
1853
  stats_.update_success_.inc();
113
1853
  stats_.update_time_.set(DateUtil::nowToMilliseconds(dispatcher_.timeSource()));
114
1853
  stats_.version_.set(HashUtil::xxHash64(system_version_info));
115
1853
  stats_.version_text_.set(system_version_info);
116
1853
  stats_.update_duration_.recordValue(update_duration.count());
117
1853
  return absl::OkStatus();
118
1857
}
119

            
120
void GrpcSubscriptionImpl::onConfigUpdateFailed(ConfigUpdateFailureReason reason,
121
3487
                                                const EnvoyException* e) {
122
3487
  switch (reason) {
123
3296
  case Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure:
124
3296
    stats_.update_failure_.inc();
125
3296
    ENVOY_LOG(debug, "gRPC update for {} failed", type_url_);
126
3296
    break;
127
51
  case Envoy::Config::ConfigUpdateFailureReason::FetchTimedout:
128
51
    stats_.init_fetch_timeout_.inc();
129
51
    disableInitFetchTimeoutTimer();
130
51
    ENVOY_LOG(warn, "gRPC config: initial fetch timed out for {}", type_url_);
131
51
    callbacks_.onConfigUpdateFailed(reason, e);
132
51
    break;
133
140
  case Envoy::Config::ConfigUpdateFailureReason::UpdateRejected:
134
    // We expect Envoy exception to be thrown when update is rejected.
135
140
    ASSERT(e != nullptr);
136
140
    disableInitFetchTimeoutTimer();
137
140
    stats_.update_rejected_.inc();
138
140
    ENVOY_LOG(warn, "gRPC config for {} rejected: {}", type_url_, e->what());
139
140
    callbacks_.onConfigUpdateFailed(reason, e);
140
140
    break;
141
3487
  }
142

            
143
3487
  stats_.update_attempt_.inc();
144
3487
}
145

            
146
6
ScopedResume GrpcSubscriptionImpl::pause() { return grpc_mux_->pause(type_url_); }
147

            
148
4245
void GrpcSubscriptionImpl::disableInitFetchTimeoutTimer() {
149
4245
  if (init_fetch_timeout_timer_) {
150
2344
    init_fetch_timeout_timer_->disableTimer();
151
2344
    init_fetch_timeout_timer_.reset();
152
2344
  }
153
4245
}
154

            
155
GrpcCollectionSubscriptionImpl::GrpcCollectionSubscriptionImpl(
156
    const xds::core::v3::ResourceLocator& collection_locator, GrpcMuxSharedPtr grpc_mux,
157
    SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
158
    SubscriptionStats stats, Event::Dispatcher& dispatcher,
159
    std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
160
    const SubscriptionOptions& options)
161
74
    : GrpcSubscriptionImpl(
162
74
          grpc_mux, callbacks, resource_decoder, stats,
163
74
          TypeUtil::descriptorFullNameToTypeUrl(collection_locator.resource_type()), dispatcher,
164
74
          init_fetch_timeout, is_aggregated, options),
165
74
      collection_locator_(collection_locator) {}
166

            
167
74
void GrpcCollectionSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) {
168
74
  ASSERT(resource_names.empty());
169
74
  GrpcSubscriptionImpl::start({XdsResourceIdentifier::encodeUrl(collection_locator_)});
170
74
}
171

            
172
} // namespace Config
173
} // namespace Envoy