1
#include "source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h"
2

            
3
#include "source/common/config/utility.h"
4
#include "source/extensions/config_subscription/grpc/xds_source_id.h"
5

            
6
namespace Envoy {
7
namespace Config {
8
namespace XdsMux {
9

            
10
SotwSubscriptionState::SotwSubscriptionState(
11
    std::string type_url, UntypedConfigUpdateCallbacks& callbacks, Event::Dispatcher& dispatcher,
12
    OpaqueResourceDecoderSharedPtr resource_decoder, XdsConfigTrackerOptRef xds_config_tracker,
13
    XdsResourcesDelegateOptRef xds_resources_delegate, const std::string& target_xds_authority)
14
163
    : BaseSubscriptionState(std::move(type_url), callbacks, dispatcher, xds_config_tracker,
15
163
                            xds_resources_delegate, target_xds_authority),
16
163
      resource_decoder_(resource_decoder) {}
17

            
18
163
SotwSubscriptionState::~SotwSubscriptionState() = default;
19

            
20
void SotwSubscriptionState::updateSubscriptionInterest(
21
    const absl::flat_hash_set<std::string>& cur_added,
22
384
    const absl::flat_hash_set<std::string>& cur_removed) {
23
414
  for (const auto& a : cur_added) {
24
220
    names_tracked_.insert(a);
25
220
  }
26
384
  for (const auto& r : cur_removed) {
27
169
    names_tracked_.erase(r);
28
169
  }
29
384
  if (!cur_added.empty() || !cur_removed.empty()) {
30
277
    update_pending_ = true;
31
277
  }
32
384
}
33

            
34
// Not having sent any requests yet counts as an "update pending" since you're supposed to resend
35
// the entirety of your interest at the start of a stream, even if nothing has changed.
36
2872
bool SotwSubscriptionState::subscriptionUpdatePending() const {
37
2872
  return update_pending_ || dynamicContextChanged();
38
2872
}
39

            
40
185
void SotwSubscriptionState::markStreamFresh(bool) {
41
185
  last_good_nonce_ = absl::nullopt;
42
185
  update_pending_ = true;
43
185
  clearDynamicContextChanged();
44
185
}
45

            
46
void SotwSubscriptionState::handleGoodResponse(
47
1096
    envoy::service::discovery::v3::DiscoveryResponse& message) {
48
1096
  std::vector<DecodedResourcePtr> non_heartbeat_resources;
49

            
50
1096
  {
51
1096
    const auto scoped_update = ttl_.scopedTtlUpdate();
52
1112
    for (const auto& any : message.resources()) {
53
166
      if (!any.Is<envoy::service::discovery::v3::Resource>() &&
54
166
          any.type_url() != message.type_url()) {
55
7
        throwEnvoyExceptionOrPanic(
56
7
            fmt::format("type URL {} embedded in an individual Any does not match "
57
7
                        "the message-wide type URL {} in DiscoveryResponse {}",
58
7
                        any.type_url(), message.type_url(), message.DebugString()));
59
7
      }
60

            
61
159
      auto decoded_resource = THROW_OR_RETURN_VALUE(
62
159
          DecodedResourceImpl::fromResource(*resource_decoder_, any, message.version_info()),
63
159
          DecodedResourceImplPtr);
64
159
      setResourceTtl(*decoded_resource);
65
159
      if (isHeartbeatResource(*decoded_resource, message.version_info())) {
66
3
        continue;
67
3
      }
68
156
      non_heartbeat_resources.push_back(std::move(decoded_resource));
69
156
    }
70
1096
  }
71

            
72
  // TODO (dmitri-d) to eliminate decoding of resources twice consider expanding the interface to
73
  // support passing of decoded resources. This would also avoid a resource copy above.
74
1089
  callbacks().onConfigUpdate(non_heartbeat_resources, message.version_info());
75
  // Now that we're passed onConfigUpdate() without an exception thrown, we know we're good.
76
1089
  last_good_version_info_ = message.version_info();
77
1089
  last_good_nonce_ = message.nonce();
78

            
79
  // Processing point when resources are successfully ingested.
80
1089
  if (xds_config_tracker_.has_value()) {
81
4
    xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources);
82
4
  }
83

            
84
  // Send the resources to the xDS delegate, if configured.
85
1089
  if (xds_resources_delegate_.has_value()) {
86
14
    XdsConfigSourceId source_id{target_xds_authority_, message.type_url()};
87
14
    std::vector<DecodedResourceRef> resource_refs;
88
14
    resource_refs.reserve(non_heartbeat_resources.size());
89
20
    for (const DecodedResourcePtr& r : non_heartbeat_resources) {
90
20
      resource_refs.emplace_back(*r);
91
20
    }
92
14
    xds_resources_delegate_->onConfigUpdated(source_id, resource_refs);
93
14
  }
94

            
95
1089
  ENVOY_LOG(debug, "Config update for {} (version {}) accepted with {} resources", typeUrl(),
96
1089
            message.version_info(), message.resources().size());
97
1089
}
98

            
99
174
void SotwSubscriptionState::handleEstablishmentFailure() {
100
174
  BaseSubscriptionState::handleEstablishmentFailure();
101

            
102
174
  if (previously_fetched_data_ || !xds_resources_delegate_.has_value()) {
103
169
    return;
104
169
  }
105

            
106
5
  const XdsConfigSourceId source_id{target_xds_authority_, type_url_};
107
5
  TRY_ASSERT_MAIN_THREAD {
108
5
    std::vector<envoy::service::discovery::v3::Resource> resources =
109
5
        xds_resources_delegate_->getResources(source_id, names_tracked_);
110

            
111
5
    std::vector<DecodedResourcePtr> decoded_resources;
112
5
    const auto scoped_update = ttl_.scopedTtlUpdate();
113
5
    std::string version_info;
114
5
    size_t unaccounted = names_tracked_.size();
115
5
    if (names_tracked_.size() == 1 && names_tracked_.contains(Envoy::Config::Wildcard)) {
116
      // For wildcard requests, there are no expectations for the number of resources returned.
117
1
      unaccounted = 0;
118
1
    }
119

            
120
13
    for (const auto& resource : resources) {
121
13
      if (version_info.empty()) {
122
4
        version_info = resource.version();
123
9
      } else {
124
9
        ASSERT(version_info == resource.version());
125
9
      }
126

            
127
13
      TRY_ASSERT_MAIN_THREAD {
128
13
        auto decoded_resource =
129
13
            THROW_OR_RETURN_VALUE(DecodedResourceImpl::fromResource(*resource_decoder_, resource),
130
13
                                  DecodedResourceImplPtr);
131
13
        setResourceTtl(*decoded_resource);
132
13
        if (unaccounted > 0) {
133
6
          --unaccounted;
134
6
        }
135
13
        decoded_resources.emplace_back(std::move(decoded_resource));
136
13
      }
137
13
      END_TRY
138
13
      CATCH(const EnvoyException& e,
139
13
            { xds_resources_delegate_->onResourceLoadFailed(source_id, resource.name(), e); });
140
13
    }
141

            
142
5
    callbacks().onConfigUpdate(decoded_resources, version_info);
143
5
    previously_fetched_data_ = true;
144
5
    if (unaccounted == 0 && !version_info.empty()) {
145
      // All the requested resources were found and validated from the xDS delegate, so set the last
146
      // known good version.
147
3
      last_good_version_info_ = version_info;
148
3
    }
149
5
  }
150
5
  END_TRY
151
5
  CATCH(const EnvoyException& e, {
152
    // TODO(abeyad): do something more than just logging the error?
153
5
    ENVOY_LOG(warn, "xDS delegate failed onEstablishmentFailure() for {}: {}", source_id.toKey(),
154
5
              e.what());
155
5
  });
156
5
}
157

            
158
std::unique_ptr<envoy::service::discovery::v3::DiscoveryRequest>
159
1420
SotwSubscriptionState::getNextRequestInternal() {
160
1420
  auto request = std::make_unique<envoy::service::discovery::v3::DiscoveryRequest>();
161
1420
  request->set_type_url(typeUrl());
162
1420
  std::copy(names_tracked_.begin(), names_tracked_.end(),
163
1420
            Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names()));
164
1420
  if (last_good_version_info_.has_value()) {
165
1142
    request->set_version_info(last_good_version_info_.value());
166
1142
  }
167
  // Default response_nonce to the last known good one. If we are being called by
168
  // getNextRequestWithAck(), this value will be overwritten.
169
1420
  if (last_good_nonce_.has_value()) {
170
1092
    request->set_response_nonce(last_good_nonce_.value());
171
1092
  }
172

            
173
1420
  update_pending_ = false;
174
1420
  return request;
175
1420
}
176

            
177
3
void SotwSubscriptionState::ttlExpiryCallback(const std::vector<std::string>& expired) {
178
3
  Protobuf::RepeatedPtrField<std::string> removed_resources;
179
3
  for (const auto& resource : expired) {
180
3
    removed_resources.Add(std::string(resource));
181
3
  }
182
3
  callbacks().onConfigUpdate({}, removed_resources, "");
183
3
}
184

            
185
171
void SotwSubscriptionState::setResourceTtl(const DecodedResourceImpl& decoded_resource) {
186
171
  if (decoded_resource.ttl()) {
187
12
    ttl_.add(std::chrono::milliseconds(*decoded_resource.ttl()), decoded_resource.name());
188
159
  } else {
189
159
    ttl_.clear(decoded_resource.name());
190
159
  }
191
171
}
192

            
193
bool SotwSubscriptionState::isHeartbeatResource(const DecodedResource& resource,
194
159
                                                const std::string& version) {
195
159
  return !resource.hasResource() && last_good_version_info_.has_value() &&
196
159
         version == last_good_version_info_.value();
197
159
}
198

            
199
} // namespace XdsMux
200
} // namespace Config
201
} // namespace Envoy