1
#pragma once
2

            
3
#include "envoy/config/subscription.h"
4
#include "envoy/config/xds_config_tracker.h"
5
#include "envoy/event/dispatcher.h"
6
#include "envoy/grpc/status.h"
7
#include "envoy/service/discovery/v3/discovery.pb.h"
8

            
9
#include "source/common/common/assert.h"
10
#include "source/common/common/logger.h"
11
#include "source/common/config/api_version.h"
12
#include "source/common/config/ttl.h"
13
#include "source/extensions/config_subscription/grpc/pausable_ack_queue.h"
14
#include "source/extensions/config_subscription/grpc/watch_map.h"
15

            
16
#include "absl/container/node_hash_map.h"
17

            
18
namespace Envoy {
19
namespace Config {
20

            
21
// Tracks the xDS protocol state of an individual ongoing delta xDS session, i.e. a single type_url.
22
// There can be multiple DeltaSubscriptionStates active. They will always all be blissfully
23
// unaware of each other's existence, even when their messages are being multiplexed together by
24
// ADS.
25
//
26
// There are two scenarios which affect how DeltaSubscriptionState manages the resources. First
27
// scenario is when we are subscribed to a wildcard resource, and other scenario is when we are not.
28
//
29
// Delta subscription state also divides the resources it cached into three categories: requested,
30
// wildcard and ambiguous.
31
//
32
// The "requested" category is for resources that we have explicitly asked for (either through the
33
// initial set of resources or through the on-demand mechanism). Resources in this category are in
34
// one of two states: "complete" and "waiting for server".
35
//
36
// "Complete" resources are resources about which the server sent us the information we need (for
37
// now - just resource version).
38
//
39
// The "waiting for server" state is either for resources that we have just requested, but we still
40
// didn't receive any version information from the server, or for the "complete" resources that,
41
// according to the server, are gone, but we are still interested in them - in such case we strip
42
// the information from the resource.
43
//
44
// The "wildcard" category is for resources that we are not explicitly interested in, but we are
45
// indirectly interested through the subscription to the wildcard resource.
46
//
47
// The "ambiguous" category is for resources that we stopped being interested in, but we may still
48
// be interested indirectly through the wildcard subscription. This situation happens because of the
49
// xDS protocol limitation - the server isn't able to tell us that the resource we subscribed to is
50
// also a part of our wildcard subscription. So when we unsubscribe from the resource, we need to
51
// receive a confirmation from the server whether to keep the resource (which means that it was a
52
// part of our wildcard subscription) or to drop it.
53
//
54
// Please refer to drawings (non-wildcard-resource-state-machine.png and
55
// (wildcard-resource-state-machine.png) for visual depictions of the resource state machine.
56
//
57
// In the "no wildcard subscription" scenario all the cached resources should be in the "requested"
58
// category. Resources are added to the category upon the explicit request and dropped when we
59
// explicitly unsubscribe from it. Transitions between "complete" and "waiting for server" happen
60
// when we receive messages from the server - if a resource in the message is in "added resources"
61
// list (thus contains version information), the resource becomes "complete". If the resource in the
62
// message is in "removed resources" list, it changes into the "waiting for server" state. If a
63
// server sends us a resource that we didn't request, it's going to be ignored.
64
//
65
// In the "wildcard subscription" scenario, "requested" category is the same as in "no wildcard
66
// subscription" scenario, with one exception - the unsubscribed "complete" resource is not removed
67
// from the cache, but it's moved to the "ambiguous" resources instead. At this point we are waiting
68
// for the server to tell us that this resource should be either moved to the "wildcard" resources,
69
// or dropped. Resources in "wildcard" category are only added there or dropped from there by the
70
// server. Resources from both "wildcard" and "ambiguous" categories can become "requested"
71
// "complete" resources if we subscribe to them again.
72
//
73
// The delta subscription state transitions between the two scenarios depending on whether we are
74
// subscribed to wildcard resource or not. Nothing special happens when we transition from "no
75
// wildcard subscription" to "wildcard subscription" scenario, but when transitioning in the other
76
// direction, we drop all the resources in "wildcard" and "ambiguous" categories.
77
class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
78
public:
79
  DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map,
80
                         Event::Dispatcher& dispatcher, XdsConfigTrackerOptRef xds_config_tracker);
81

            
82
  // Update which resources we're interested in subscribing to.
83
  void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added,
84
                                  const absl::flat_hash_set<std::string>& cur_removed);
85
2755
  bool dynamicContextChanged() const { return dynamic_context_changed_; }
86
14
  void setDynamicContextChanged() { dynamic_context_changed_ = true; }
87
2751
  void clearDynamicContextChanged() { dynamic_context_changed_ = false; }
88

            
89
  // Whether there was a change in our subscription interest we have yet to inform the server of.
90
  bool subscriptionUpdatePending() const;
91

            
92
  // Marks the stream as fresh for the next reconnection attempt. If
93
  // should_send_initial_resource_versions is true, then the next request will
94
  // also populate the initial_resource_versions field in the first request (if
95
  // there are relevant resources).
96
  void markStreamFresh(bool should_send_initial_resource_versions);
97

            
98
  // May modify the order of resources to put all the non-heartbeat resources first.
99
  UpdateAck handleResponse(envoy::service::discovery::v3::DeltaDiscoveryResponse& message);
100

            
101
  void handleEstablishmentFailure();
102

            
103
  // Returns the next gRPC request proto to be sent off to the server, based on this object's
104
  // understanding of the current protocol state, and new resources that Envoy wants to request.
105
  envoy::service::discovery::v3::DeltaDiscoveryRequest getNextRequestAckless();
106

            
107
  // The WithAck version first calls the Ack-less version, then adds in the passed-in ack.
108
  envoy::service::discovery::v3::DeltaDiscoveryRequest getNextRequestWithAck(const UpdateAck& ack);
109

            
110
  DeltaSubscriptionState(const DeltaSubscriptionState&) = delete;
111
  DeltaSubscriptionState& operator=(const DeltaSubscriptionState&) = delete;
112

            
113
private:
114
  bool isHeartbeatResponse(const envoy::service::discovery::v3::Resource& resource) const;
115
  void handleGoodResponse(envoy::service::discovery::v3::DeltaDiscoveryResponse& message);
116
  void handleBadResponse(const EnvoyException& e, UpdateAck& ack);
117

            
118
  class ResourceState {
119
  public:
120
    // Builds a ResourceState in the waitingForServer state.
121
1418
    ResourceState() = default;
122
    // Builds a ResourceState with a specific version
123
6
    ResourceState(absl::string_view version) : version_(version) {}
124
    // Self-documenting alias of default constructor.
125
1418
    static ResourceState waitingForServer() { return {}; }
126
    // Self-documenting alias of constructor with version.
127
6
    static ResourceState withVersion(absl::string_view version) { return {version}; }
128

            
129
    // If true, we currently have no version of this resource - we are waiting for the server to
130
    // provide us with one.
131
1797
    bool isWaitingForServer() const { return version_ == absl::nullopt; }
132

            
133
28
    void setAsWaitingForServer() { version_ = absl::nullopt; }
134
580
    void setVersion(absl::string_view version) { version_ = std::string(version); }
135

            
136
    // Must not be called if waitingForServer() == true.
137
315
    std::string version() const {
138
315
      ASSERT(version_.has_value());
139
315
      return version_.value_or("");
140
315
    }
141

            
142
  private:
143
    absl::optional<std::string> version_;
144
  };
145

            
146
  void addResourceStateFromServer(const envoy::service::discovery::v3::Resource& resource);
147
  OptRef<ResourceState> getRequestedResourceState(absl::string_view resource_name);
148
  OptRef<const ResourceState> getRequestedResourceState(absl::string_view resource_name) const;
149

            
150
  bool isInitialRequestForLegacyWildcard();
151

            
152
  // A map from resource name to per-resource version. The keys of this map are exactly the resource
153
  // names we are currently interested in. Those in the waitingForServer state currently don't have
154
  // any version for that resource: we need to inform the server if we lose interest in them, but we
155
  // also need to *not* include them in the initial_resource_versions map upon a reconnect.
156
  absl::node_hash_map<std::string, ResourceState> requested_resource_state_;
157
  // A map from resource name to per-resource version. The keys of this map are resource names we
158
  // have received as a part of the wildcard subscription.
159
  absl::node_hash_map<std::string, std::string> wildcard_resource_state_;
160
  // Used for storing resources that we lost interest in, but could
161
  // also be a part of wildcard subscription.
162
  absl::node_hash_map<std::string, std::string> ambiguous_resource_state_;
163

            
164
  // Not all xDS resources supports heartbeats due to there being specific information encoded in
165
  // an empty response, which is indistinguishable from a heartbeat in some cases. For now we just
166
  // disable heartbeats for these resources (currently only VHDS).
167
  const bool supports_heartbeats_;
168
  TtlManager ttl_;
169

            
170
  const std::string type_url_;
171
  UntypedConfigUpdateCallbacks& watch_map_;
172
  XdsConfigTrackerOptRef xds_config_tracker_;
173

            
174
  bool in_initial_legacy_wildcard_{true};
175
  bool any_request_sent_yet_in_current_stream_{};
176
  bool should_send_initial_resource_versions_{true};
177
  bool dynamic_context_changed_{};
178

            
179
  // Tracks changes in our subscription interest since the previous DeltaDiscoveryRequest we sent.
180
  // TODO: Can't use absl::flat_hash_set due to ordering issues in gTest expectation matching.
181
  // Feel free to change to an unordered container once we figure out how to make it work.
182
  std::set<std::string> names_added_;
183
  std::set<std::string> names_removed_;
184
};
185

            
186
} // namespace Config
187
} // namespace Envoy