1
#pragma once
2

            
3
#include "envoy/grpc/status.h"
4

            
5
#include "source/common/common/assert.h"
6
#include "source/common/common/logger.h"
7
#include "source/common/config/api_version.h"
8
#include "source/extensions/config_subscription/grpc/xds_mux/subscription_state.h"
9

            
10
#include "absl/container/node_hash_map.h"
11
#include "absl/types/optional.h"
12

            
13
namespace Envoy {
14
namespace Config {
15
namespace XdsMux {
16

            
17
// Tracks the state of a delta xDS-over-gRPC protocol session.
18
class DeltaSubscriptionState
19
    : public BaseSubscriptionState<envoy::service::discovery::v3::DeltaDiscoveryResponse,
20
                                   envoy::service::discovery::v3::DeltaDiscoveryRequest> {
21
public:
22
  DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map,
23
                         Event::Dispatcher& dispatcher, XdsConfigTrackerOptRef xds_config_tracker);
24

            
25
  ~DeltaSubscriptionState() override;
26

            
27
  // Update which resources we're interested in subscribing to.
28
  void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added,
29
                                  const absl::flat_hash_set<std::string>& cur_removed) override;
30

            
31
  // Whether there was a change in our subscription interest we have yet to inform the server of.
32
  bool subscriptionUpdatePending() const override;
33

            
34
  void markStreamFresh(bool should_send_initial_resource_versions) override;
35

            
36
  void ttlExpiryCallback(const std::vector<std::string>& expired) override;
37

            
38
  DeltaSubscriptionState(const DeltaSubscriptionState&) = delete;
39
  DeltaSubscriptionState& operator=(const DeltaSubscriptionState&) = delete;
40

            
41
private:
42
  std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryRequest>
43
  getNextRequestInternal() override;
44

            
45
  void setResourceTtl(const envoy::service::discovery::v3::Resource& resource);
46
  bool isHeartbeatResource(const envoy::service::discovery::v3::Resource& resource) const;
47
  void handleGoodResponse(envoy::service::discovery::v3::DeltaDiscoveryResponse& message) override;
48
  void addResourceStateFromServer(const envoy::service::discovery::v3::Resource& resource);
49

            
50
  class ResourceState {
51
  public:
52
    // Builds a ResourceVersion in the waitingForServer state.
53
706
    ResourceState() = default;
54
    // Builds a ResourceState with a specific version
55
6
    ResourceState(absl::string_view version) : version_(version) {}
56
    // Self-documenting alias of default constructor.
57
706
    static ResourceState waitingForServer() { return {}; }
58
    // Self-documenting alias of constructor with version.
59
6
    static ResourceState withVersion(absl::string_view version) { return {version}; }
60

            
61
    // If true, we currently have no version of this resource - we are waiting for the server to
62
    // provide us with one.
63
1057
    bool isWaitingForServer() const { return version_ == absl::nullopt; }
64

            
65
14
    void setAsWaitingForServer() { version_ = absl::nullopt; }
66
258
    void setVersion(absl::string_view version) { version_ = std::string(version); }
67

            
68
    // Must not be called if waitingForServer() == true.
69
248
    std::string version() const {
70
248
      ASSERT(version_.has_value());
71
248
      return version_.value_or("");
72
248
    }
73

            
74
  private:
75
    absl::optional<std::string> version_;
76
  };
77

            
78
  OptRef<ResourceState> getRequestedResourceState(absl::string_view resource_name);
79
  OptRef<const ResourceState> getRequestedResourceState(absl::string_view resource_name) const;
80

            
81
  bool isInitialRequestForLegacyWildcard();
82

            
83
  // Not all xDS resources support heartbeats due to there being specific information encoded in
84
  // an empty response, which is indistinguishable from a heartbeat in some cases. For now we just
85
  // disable heartbeats for these resources (currently only VHDS).
86
  const bool supports_heartbeats_;
87

            
88
  // A map from resource name to per-resource version. The keys of this map are exactly the resource
89
  // names we are currently interested in. Those in the waitingForServer state currently don't have
90
  // any version for that resource: we need to inform the server if we lose interest in them, but we
91
  // also need to *not* include them in the initial_resource_versions map upon a reconnect.
92
  absl::node_hash_map<std::string, ResourceState> requested_resource_state_;
93
  // A map from resource name to per-resource version. The keys of this map are resource names we
94
  // have received as a part of the wildcard subscription.
95
  absl::node_hash_map<std::string, std::string> wildcard_resource_state_;
96
  // Used for storing resources that we lost interest in, but could
97
  // also be a part of wildcard subscription.
98
  absl::node_hash_map<std::string, std::string> ambiguous_resource_state_;
99

            
100
  bool in_initial_legacy_wildcard_{true};
101
  bool any_request_sent_yet_in_current_stream_{};
102
  bool should_send_initial_resource_versions_{true};
103

            
104
  // Tracks changes in our subscription interest since the previous DeltaDiscoveryRequest we sent.
105
  // TODO: Can't use absl::flat_hash_set due to ordering issues in gTest expectation matching.
106
  // Feel free to change to an unordered container once we figure out how to make it work.
107
  std::set<std::string> names_added_;
108
  std::set<std::string> names_removed_;
109
};
110

            
111
class DeltaSubscriptionStateFactory : public SubscriptionStateFactory<DeltaSubscriptionState> {
112
public:
113
149
  DeltaSubscriptionStateFactory(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
114
149
  ~DeltaSubscriptionStateFactory() override = default;
115
  std::unique_ptr<DeltaSubscriptionState>
116
  makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks,
117
                        OpaqueResourceDecoderSharedPtr, XdsConfigTrackerOptRef xds_config_tracker,
118
                        XdsResourcesDelegateOptRef /*xds_resources_delegate*/,
119
227
                        const std::string& /*target_xds_authority*/) override {
120
227
    return std::make_unique<DeltaSubscriptionState>(type_url, callbacks, dispatcher_,
121
227
                                                    xds_config_tracker);
122
227
  }
123

            
124
private:
125
  Event::Dispatcher& dispatcher_;
126
};
127

            
128
} // namespace XdsMux
129
} // namespace Config
130
} // namespace Envoy