1
#include "source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h"
2

            
3
#include "source/common/config/utility.h"
4
#include "source/extensions/config_subscription/grpc/xds_source_id.h"
5

            
6
namespace Envoy {
7
namespace Config {
8
namespace XdsMux {
9

            
10
SotwSubscriptionState::SotwSubscriptionState(
11
    std::string type_url, UntypedConfigUpdateCallbacks& callbacks, Event::Dispatcher& dispatcher,
12
    OpaqueResourceDecoderSharedPtr resource_decoder, XdsConfigTrackerOptRef xds_config_tracker,
13
    XdsResourcesDelegateOptRef xds_resources_delegate, const std::string& target_xds_authority)
14
153
    : BaseSubscriptionState(std::move(type_url), callbacks, dispatcher, xds_config_tracker,
15
153
                            xds_resources_delegate, target_xds_authority),
16
153
      resource_decoder_(resource_decoder) {}
17

            
18
153
SotwSubscriptionState::~SotwSubscriptionState() = default;
19

            
20
void SotwSubscriptionState::updateSubscriptionInterest(
21
    const absl::flat_hash_set<std::string>& cur_added,
22
364
    const absl::flat_hash_set<std::string>& cur_removed) {
23
394
  for (const auto& a : cur_added) {
24
210
    names_tracked_.insert(a);
25
210
  }
26
364
  for (const auto& r : cur_removed) {
27
159
    names_tracked_.erase(r);
28
159
  }
29
364
  if (!cur_added.empty() || !cur_removed.empty()) {
30
257
    update_pending_ = true;
31
257
  }
32
364
}
33

            
34
// Not having sent any requests yet counts as an "update pending" since you're supposed to resend
35
// the entirety of your interest at the start of a stream, even if nothing has changed.
36
2813
bool SotwSubscriptionState::subscriptionUpdatePending() const {
37
2813
  return update_pending_ || dynamicContextChanged();
38
2813
}
39

            
40
176
void SotwSubscriptionState::markStreamFresh(bool) {
41
176
  last_good_nonce_ = absl::nullopt;
42
176
  update_pending_ = true;
43
176
  clearDynamicContextChanged();
44
176
}
45

            
46
void SotwSubscriptionState::handleGoodResponse(
47
1085
    envoy::service::discovery::v3::DiscoveryResponse& message) {
48
1085
  std::vector<DecodedResourcePtr> non_heartbeat_resources;
49

            
50
1085
  {
51
1085
    const auto scoped_update = ttl_.scopedTtlUpdate();
52
1101
    for (const auto& any : message.resources()) {
53
155
      if (!any.Is<envoy::service::discovery::v3::Resource>() &&
54
155
          any.type_url() != message.type_url()) {
55
7
        throwEnvoyExceptionOrPanic(
56
7
            fmt::format("type URL {} embedded in an individual Any does not match "
57
7
                        "the message-wide type URL {} in DiscoveryResponse {}",
58
7
                        any.type_url(), message.type_url(), message.DebugString()));
59
7
      }
60

            
61
148
      auto decoded_resource = THROW_OR_RETURN_VALUE(
62
148
          DecodedResourceImpl::fromResource(*resource_decoder_, any, message.version_info()),
63
148
          DecodedResourceImplPtr);
64
148
      setResourceTtl(*decoded_resource);
65
148
      if (isHeartbeatResource(*decoded_resource, message.version_info())) {
66
3
        continue;
67
3
      }
68
145
      non_heartbeat_resources.push_back(std::move(decoded_resource));
69
145
    }
70
1085
  }
71

            
72
  // TODO (dmitri-d) to eliminate decoding of resources twice consider expanding the interface to
73
  // support passing of decoded resources. This would also avoid a resource copy above.
74
1078
  callbacks().onConfigUpdate(non_heartbeat_resources, message.version_info());
75
  // Now that we're passed onConfigUpdate() without an exception thrown, we know we're good.
76
1078
  last_good_version_info_ = message.version_info();
77
1078
  last_good_nonce_ = message.nonce();
78

            
79
  // Processing point when resources are successfully ingested.
80
1078
  if (xds_config_tracker_.has_value()) {
81
4
    xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources);
82
4
  }
83

            
84
  // Send the resources to the xDS delegate, if configured.
85
1078
  if (xds_resources_delegate_.has_value()) {
86
14
    XdsConfigSourceId source_id{target_xds_authority_, message.type_url()};
87
14
    std::vector<DecodedResourceRef> resource_refs;
88
14
    resource_refs.reserve(non_heartbeat_resources.size());
89
20
    for (const DecodedResourcePtr& r : non_heartbeat_resources) {
90
20
      resource_refs.emplace_back(*r);
91
20
    }
92
14
    xds_resources_delegate_->onConfigUpdated(source_id, resource_refs);
93
14
  }
94

            
95
1078
  ENVOY_LOG(debug, "Config update for {} (version {}) accepted with {} resources", typeUrl(),
96
1078
            message.version_info(), message.resources().size());
97
1078
}
98

            
99
165
void SotwSubscriptionState::handleEstablishmentFailure() {
100
165
  BaseSubscriptionState::handleEstablishmentFailure();
101

            
102
165
  if (previously_fetched_data_ || !xds_resources_delegate_.has_value()) {
103
160
    return;
104
160
  }
105

            
106
5
  const XdsConfigSourceId source_id{target_xds_authority_, type_url_};
107
5
  TRY_ASSERT_MAIN_THREAD {
108
5
    std::vector<envoy::service::discovery::v3::Resource> resources =
109
5
        xds_resources_delegate_->getResources(source_id, names_tracked_);
110

            
111
5
    std::vector<DecodedResourcePtr> decoded_resources;
112
5
    const auto scoped_update = ttl_.scopedTtlUpdate();
113
5
    std::string version_info;
114
5
    size_t unaccounted = names_tracked_.size();
115
5
    if (names_tracked_.size() == 1 && names_tracked_.contains(Envoy::Config::Wildcard)) {
116
      // For wildcard requests, there are no expectations for the number of resources returned.
117
1
      unaccounted = 0;
118
1
    }
119

            
120
13
    for (const auto& resource : resources) {
121
13
      if (version_info.empty()) {
122
4
        version_info = resource.version();
123
9
      } else {
124
9
        ASSERT(version_info == resource.version());
125
9
      }
126

            
127
13
      TRY_ASSERT_MAIN_THREAD {
128
13
        auto decoded_resource =
129
13
            THROW_OR_RETURN_VALUE(DecodedResourceImpl::fromResource(*resource_decoder_, resource),
130
13
                                  DecodedResourceImplPtr);
131
13
        setResourceTtl(*decoded_resource);
132
13
        if (unaccounted > 0) {
133
6
          --unaccounted;
134
6
        }
135
13
        decoded_resources.emplace_back(std::move(decoded_resource));
136
13
      }
137
13
      END_TRY
138
13
      CATCH(const EnvoyException& e,
139
13
            { xds_resources_delegate_->onResourceLoadFailed(source_id, resource.name(), e); });
140
13
    }
141

            
142
5
    callbacks().onConfigUpdate(decoded_resources, version_info);
143
5
    previously_fetched_data_ = true;
144
5
    if (unaccounted == 0 && !version_info.empty()) {
145
      // All the requested resources were found and validated from the xDS delegate, so set the last
146
      // known good version.
147
3
      last_good_version_info_ = version_info;
148
3
    }
149
5
  }
150
5
  END_TRY
151
5
  CATCH(const EnvoyException& e, {
152
    // TODO(abeyad): do something more than just logging the error?
153
5
    ENVOY_LOG(warn, "xDS delegate failed onEstablishmentFailure() for {}: {}", source_id.toKey(),
154
5
              e.what());
155
5
  });
156
5
}
157

            
158
std::unique_ptr<envoy::service::discovery::v3::DiscoveryRequest>
159
1400
SotwSubscriptionState::getNextRequestInternal() {
160
1400
  auto request = std::make_unique<envoy::service::discovery::v3::DiscoveryRequest>();
161
1400
  request->set_type_url(typeUrl());
162
1400
  std::copy(names_tracked_.begin(), names_tracked_.end(),
163
1400
            Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names()));
164
1400
  if (last_good_version_info_.has_value()) {
165
1131
    request->set_version_info(last_good_version_info_.value());
166
1131
  }
167
  // Default response_nonce to the last known good one. If we are being called by
168
  // getNextRequestWithAck(), this value will be overwritten.
169
1400
  if (last_good_nonce_.has_value()) {
170
1081
    request->set_response_nonce(last_good_nonce_.value());
171
1081
  }
172

            
173
1400
  update_pending_ = false;
174
1400
  return request;
175
1400
}
176

            
177
3
void SotwSubscriptionState::ttlExpiryCallback(const std::vector<std::string>& expired) {
178
3
  Protobuf::RepeatedPtrField<std::string> removed_resources;
179
3
  for (const auto& resource : expired) {
180
3
    removed_resources.Add(std::string(resource));
181
3
  }
182
3
  callbacks().onConfigUpdate({}, removed_resources, "");
183
3
}
184

            
185
160
void SotwSubscriptionState::setResourceTtl(const DecodedResourceImpl& decoded_resource) {
186
160
  if (decoded_resource.ttl()) {
187
12
    ttl_.add(std::chrono::milliseconds(*decoded_resource.ttl()), decoded_resource.name());
188
148
  } else {
189
148
    ttl_.clear(decoded_resource.name());
190
148
  }
191
160
}
192

            
193
bool SotwSubscriptionState::isHeartbeatResource(const DecodedResource& resource,
194
148
                                                const std::string& version) {
195
148
  return !resource.hasResource() && last_good_version_info_.has_value() &&
196
148
         version == last_good_version_info_.value();
197
148
}
198

            
199
} // namespace XdsMux
200
} // namespace Config
201
} // namespace Envoy