1
#pragma once
2

            
3
#include <set>
4
#include <string>
5
#include <utility>
6

            
7
#include "envoy/config/custom_config_validators.h"
8
#include "envoy/config/eds_resources_cache.h"
9
#include "envoy/config/subscription.h"
10
#include "envoy/service/discovery/v3/discovery.pb.h"
11

            
12
#include "source/common/common/assert.h"
13
#include "source/common/common/logger.h"
14
#include "source/common/config/resource_name.h"
15

            
16
#include "absl/container/flat_hash_map.h"
17
#include "absl/container/flat_hash_set.h"
18

            
19
namespace Envoy {
20
namespace Config {
21

            
22
struct AddedRemoved {
23
  AddedRemoved(absl::flat_hash_set<std::string>&& added, absl::flat_hash_set<std::string>&& removed)
24
2850
      : added_(std::move(added)), removed_(std::move(removed)) {}
25
  absl::flat_hash_set<std::string> added_;
26
  absl::flat_hash_set<std::string> removed_;
27
};
28

            
29
struct Watch {
30
  Watch(SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder)
31
1417
      : callbacks_(callbacks), resource_decoder_(resource_decoder) {}
32
  SubscriptionCallbacks& callbacks_;
33
  OpaqueResourceDecoder& resource_decoder_;
34
  absl::flat_hash_set<std::string> resource_names_;
35
  // Needed only for state-of-the-world.
36
  // Whether the most recent update contained any resources this watch cares about.
37
  // If true, a new update that also contains no resources can skip this watch.
38
  bool state_of_the_world_empty_{true};
39
};
40

            
41
// NOTE: Users are responsible for eventually calling removeWatch() on the Watch* returned
42
//       by addWatch(). We don't expect there to be new users of this class beyond
43
//       NewGrpcMuxImpl and DeltaSubscriptionImpl (TODO(fredlas) to be renamed).
44
//
45
// Manages "watches" of xDS resources. Several xDS callers might ask for a subscription to the same
46
// resource name "X". The xDS machinery must return to each their very own subscription to X.
47
// The xDS machinery's "watch" concept accomplishes that, while avoiding parallel redundant xDS
48
// requests for X. Each of those subscriptions is viewed as a "watch" on X, while behind the scenes
49
// there is just a single real subscription to that resource name.
50
//
51
// This class maintains the watches<-->subscription mapping: it
52
// 1) delivers updates to all interested watches, and
53
// 2) tracks which resource names should be {added to,removed from} the subscription when the
54
//    {first,last} watch on a resource name is {added,removed}.
55
//
56
// #1 is accomplished by WatchMap's implementation of the SubscriptionCallbacks interface.
57
// This interface allows the xDS client to just throw each xDS update message it receives directly
58
// into WatchMap::onConfigUpdate, rather than having to track the various watches' callbacks.
59
//
60
// The information for #2 is returned by updateWatchInterest(); the caller should use it to
61
// update the subscription accordingly.
62
//
63
// A WatchMap is assumed to be dedicated to a single type_url type of resource (EDS, CDS, etc).
64
//
65
// The WatchMap can also store the fetched resources in a cache, and allow others to fetch
66
// resources directly from the cache. This is done for EDS in the following case:
67
// Assume an active EDS cluster exists with some load-assignment that is kept in the cache.
68
// If the cluster is updated, and no load-assignment is sent from the xDS server, the
69
// cached version will be used.
70
// The WatchMap is responsible to update the cache with the resource contents, and it is
71
// up to the specific xDS type subscription handler (i.e., EdsClusterImpl), to fetch
72
// the resource from the cache.
73
class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable<Logger::Id::config> {
74
public:
75
  WatchMap(const bool use_namespace_matching, const std::string& type_url,
76
           CustomConfigValidators* config_validators, EdsResourcesCacheOptRef eds_resources_cache)
77
1172
      : use_namespace_matching_(use_namespace_matching), type_url_(type_url),
78
1172
        config_validators_(config_validators), eds_resources_cache_(eds_resources_cache) {
79
    // If eds resources cache is provided, then the type must be ClusterLoadAssignment.
80
1172
    ASSERT(!eds_resources_cache_.has_value() ||
81
1172
           (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>()));
82
1172
  }
83

            
84
  // Adds 'callbacks' to the WatchMap, with every possible resource being watched.
85
  // (Use updateWatchInterest() to narrow it down to some specific names).
86
  // Returns the newly added watch, to be used with updateWatchInterest and removeWatch.
87
  Watch* addWatch(SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder);
88

            
89
  // Updates the set of resource names that the given watch should watch.
90
  // Returns any resource name additions/removals that are unique across all watches. That is:
91
  // 1) if 'resources' contains X and no other watch cares about X, X will be in added_.
92
  // 2) if 'resources' does not contain Y, and this watch was the only one that cared about Y,
93
  //    Y will be in removed_.
94
  AddedRemoved updateWatchInterest(Watch* watch,
95
                                   const absl::flat_hash_set<std::string>& update_to_these_names);
96

            
97
  // Expects that the watch to be removed has already had all of its resource names removed via
98
  // updateWatchInterest().
99
  void removeWatch(Watch* watch);
100

            
101
  // UntypedConfigUpdateCallbacks.
102
  void onConfigUpdate(const Protobuf::RepeatedPtrField<Protobuf::Any>& resources,
103
                      const std::string& version_info) override;
104

            
105
  void onConfigUpdate(const std::vector<DecodedResourcePtr>& resources,
106
                      const std::string& version_info) override;
107

            
108
  void
109
  onConfigUpdate(absl::Span<const envoy::service::discovery::v3::Resource* const> added_resources,
110
                 const Protobuf::RepeatedPtrField<std::string>& removed_resources,
111
                 const std::string& system_version_info) override;
112
  void onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) override;
113

            
114
  WatchMap(const WatchMap&) = delete;
115
  WatchMap& operator=(const WatchMap&) = delete;
116

            
117
32
  void setConfigValidators(CustomConfigValidators* config_validators) {
118
32
    config_validators_ = config_validators;
119
32
  }
120

            
121
private:
122
  void removeDeferredWatches();
123

            
124
  // Given a list of names that are new to an individual watch, returns those names that are in fact
125
  // new to the entire subscription.
126
  absl::flat_hash_set<std::string>
127
  findAdditions(const absl::flat_hash_set<std::string>& newly_added_to_watch, Watch* watch);
128

            
129
  // Given a list of names that an individual watch no longer cares about, returns those names that
130
  // in fact the entire subscription no longer cares about.
131
  absl::flat_hash_set<std::string>
132
  findRemovals(const absl::flat_hash_set<std::string>& newly_removed_from_watch, Watch* watch);
133

            
134
  // Returns the union of watch_interest_[resource_name] and wildcard_watches_.
135
  absl::flat_hash_set<Watch*> watchesInterestedIn(const std::string& resource_name);
136

            
137
  absl::flat_hash_set<std::unique_ptr<Watch>> watches_;
138

            
139
  // Watches whose interest set is currently empty, which is interpreted as "everything".
140
  absl::flat_hash_set<Watch*> wildcard_watches_;
141

            
142
  // Watches that have been removed inside the call stack of the WatchMap's onConfigUpdate(). This
143
  // can happen when a watch's onConfigUpdate() results in another watch being removed via
144
  // removeWatch().
145
  std::unique_ptr<absl::flat_hash_set<Watch*>> deferred_removed_during_update_;
146

            
147
  // Maps a resource name to the set of watches interested in that resource. Has two purposes:
148
  // 1) Acts as a reference count; no watches care anymore ==> the resource can be removed.
149
  // 2) Enables efficient lookup of all interested watches when a resource has been updated.
150
  absl::flat_hash_map<std::string, absl::flat_hash_set<Watch*>> watch_interest_;
151

            
152
  const bool use_namespace_matching_;
153
  const std::string type_url_;
154
  CustomConfigValidators* config_validators_;
155
  EdsResourcesCacheOptRef eds_resources_cache_;
156
};
157

            
158
} // namespace Config
159
} // namespace Envoy