Line data Source code
1 : #include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
2 :
3 : #include <chrono>
4 :
5 : #include "envoy/config/subscription.h"
6 :
7 : #include "source/common/common/assert.h"
8 : #include "source/common/common/logger.h"
9 : #include "source/common/common/utility.h"
10 : #include "source/common/config/xds_resource.h"
11 : #include "source/common/grpc/common.h"
12 : #include "source/common/protobuf/protobuf.h"
13 : #include "source/common/protobuf/utility.h"
14 :
15 : namespace Envoy {
16 : namespace Config {
17 :
18 : constexpr std::chrono::milliseconds UpdateDurationLogThreshold = std::chrono::milliseconds(50);
19 :
20 : GrpcSubscriptionImpl::GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux,
21 : SubscriptionCallbacks& callbacks,
22 : OpaqueResourceDecoderSharedPtr resource_decoder,
23 : SubscriptionStats stats, absl::string_view type_url,
24 : Event::Dispatcher& dispatcher,
25 : std::chrono::milliseconds init_fetch_timeout,
26 : bool is_aggregated, const SubscriptionOptions& options)
27 : : grpc_mux_(grpc_mux), callbacks_(callbacks), resource_decoder_(resource_decoder),
28 : stats_(stats), type_url_(type_url), dispatcher_(dispatcher),
29 135 : init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated), options_(options) {}
30 :
31 : // Config::Subscription
32 135 : void GrpcSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resources) {
33 135 : if (init_fetch_timeout_.count() > 0) {
34 135 : init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
35 0 : onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
36 0 : });
37 135 : init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
38 135 : }
39 :
40 135 : watch_ = grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, options_);
41 :
42 : // The attempt stat here is maintained for the purposes of having consistency between ADS and
43 : // gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
44 : // "attempt" for a given xDS API combined by ADS is not really that meaningful.
45 135 : stats_.update_attempt_.inc();
46 :
47 : // ADS initial request batching relies on the users of the GrpcMux *not* calling start on it,
48 : // whereas non-ADS xDS users must call it themselves.
49 135 : if (!is_aggregated_) {
50 0 : grpc_mux_->start();
51 0 : }
52 135 : }
53 :
54 : void GrpcSubscriptionImpl::updateResourceInterest(
55 0 : const absl::flat_hash_set<std::string>& update_to_these_names) {
56 0 : watch_->update(update_to_these_names);
57 0 : stats_.update_attempt_.inc();
58 0 : }
59 :
60 : void GrpcSubscriptionImpl::requestOnDemandUpdate(
61 0 : const absl::flat_hash_set<std::string>& for_update) {
62 0 : grpc_mux_->requestOnDemandUpdate(type_url_, for_update);
63 0 : stats_.update_attempt_.inc();
64 0 : }
65 :
66 : // Config::SubscriptionCallbacks
67 : absl::Status
68 : GrpcSubscriptionImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
69 198 : const std::string& version_info) {
70 198 : disableInitFetchTimeoutTimer();
71 : // TODO(mattklein123): In the future if we start tracking per-resource versions, we need to
72 : // supply those versions to onConfigUpdate() along with the xDS response ("system")
73 : // version_info. This way, both types of versions can be tracked and exposed for debugging by
74 : // the configuration update targets.
75 198 : auto start = dispatcher_.timeSource().monotonicTime();
76 198 : absl::Status status = callbacks_.onConfigUpdate(resources, version_info);
77 198 : if (!status.ok()) {
78 0 : return status;
79 0 : }
80 198 : std::chrono::milliseconds update_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
81 198 : dispatcher_.timeSource().monotonicTime() - start);
82 198 : stats_.update_success_.inc();
83 198 : stats_.update_attempt_.inc();
84 198 : stats_.update_time_.set(DateUtil::nowToMilliseconds(dispatcher_.timeSource()));
85 198 : stats_.version_.set(HashUtil::xxHash64(version_info));
86 198 : stats_.version_text_.set(version_info);
87 198 : stats_.update_duration_.recordValue(update_duration.count());
88 198 : ENVOY_LOG(debug, "gRPC config for {} accepted with {} resources with version {}", type_url_,
89 198 : resources.size(), version_info);
90 :
91 198 : if (update_duration > UpdateDurationLogThreshold) {
92 0 : ENVOY_LOG(debug, "gRPC config update took {} ms! Resources names: {}", update_duration.count(),
93 0 : absl::StrJoin(resources, ",", ResourceNameFormatter()));
94 0 : }
95 198 : return absl::OkStatus();
96 198 : }
97 :
98 : absl::Status GrpcSubscriptionImpl::onConfigUpdate(
99 : const std::vector<Config::DecodedResourceRef>& added_resources,
100 : const Protobuf::RepeatedPtrField<std::string>& removed_resources,
101 46 : const std::string& system_version_info) {
102 46 : disableInitFetchTimeoutTimer();
103 46 : stats_.update_attempt_.inc();
104 46 : auto start = dispatcher_.timeSource().monotonicTime();
105 46 : absl::Status status =
106 46 : callbacks_.onConfigUpdate(added_resources, removed_resources, system_version_info);
107 46 : if (!status.ok()) {
108 0 : return status;
109 0 : }
110 46 : std::chrono::milliseconds update_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
111 46 : dispatcher_.timeSource().monotonicTime() - start);
112 46 : stats_.update_success_.inc();
113 46 : stats_.update_time_.set(DateUtil::nowToMilliseconds(dispatcher_.timeSource()));
114 46 : stats_.version_.set(HashUtil::xxHash64(system_version_info));
115 46 : stats_.version_text_.set(system_version_info);
116 46 : stats_.update_duration_.recordValue(update_duration.count());
117 46 : return absl::OkStatus();
118 46 : }
119 :
120 : void GrpcSubscriptionImpl::onConfigUpdateFailed(ConfigUpdateFailureReason reason,
121 130 : const EnvoyException* e) {
122 130 : switch (reason) {
123 130 : case Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure:
124 130 : stats_.update_failure_.inc();
125 130 : ENVOY_LOG(debug, "gRPC update for {} failed", type_url_);
126 130 : break;
127 0 : case Envoy::Config::ConfigUpdateFailureReason::FetchTimedout:
128 0 : stats_.init_fetch_timeout_.inc();
129 0 : disableInitFetchTimeoutTimer();
130 0 : ENVOY_LOG(warn, "gRPC config: initial fetch timed out for {}", type_url_);
131 0 : callbacks_.onConfigUpdateFailed(reason, e);
132 0 : break;
133 0 : case Envoy::Config::ConfigUpdateFailureReason::UpdateRejected:
134 : // We expect Envoy exception to be thrown when update is rejected.
135 0 : ASSERT(e != nullptr);
136 0 : disableInitFetchTimeoutTimer();
137 0 : stats_.update_rejected_.inc();
138 0 : ENVOY_LOG(warn, "gRPC config for {} rejected: {}", type_url_, e->what());
139 0 : callbacks_.onConfigUpdateFailed(reason, e);
140 0 : break;
141 130 : }
142 :
143 130 : stats_.update_attempt_.inc();
144 130 : }
145 :
146 0 : ScopedResume GrpcSubscriptionImpl::pause() { return grpc_mux_->pause(type_url_); }
147 :
148 244 : void GrpcSubscriptionImpl::disableInitFetchTimeoutTimer() {
149 244 : if (init_fetch_timeout_timer_) {
150 116 : init_fetch_timeout_timer_->disableTimer();
151 116 : init_fetch_timeout_timer_.reset();
152 116 : }
153 244 : }
154 :
155 : GrpcCollectionSubscriptionImpl::GrpcCollectionSubscriptionImpl(
156 : const xds::core::v3::ResourceLocator& collection_locator, GrpcMuxSharedPtr grpc_mux,
157 : SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
158 : SubscriptionStats stats, Event::Dispatcher& dispatcher,
159 : std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
160 : const SubscriptionOptions& options)
161 : : GrpcSubscriptionImpl(
162 : grpc_mux, callbacks, resource_decoder, stats,
163 : TypeUtil::descriptorFullNameToTypeUrl(collection_locator.resource_type()), dispatcher,
164 : init_fetch_timeout, is_aggregated, options),
165 0 : collection_locator_(collection_locator) {}
166 :
167 0 : void GrpcCollectionSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) {
168 0 : ASSERT(resource_names.empty());
169 0 : GrpcSubscriptionImpl::start({XdsResourceIdentifier::encodeUrl(collection_locator_)});
170 0 : }
171 :
172 : } // namespace Config
173 : } // namespace Envoy
|