1
#pragma once
2

            
3
#include <chrono>
4
#include <memory>
5

            
6
#include "envoy/config/grpc_mux.h"
7
#include "envoy/config/subscription.h"
8
#include "envoy/event/dispatcher.h"
9

            
10
#include "source/common/common/logger.h"
11

            
12
#include "xds/core/v3/resource_locator.pb.h"
13

            
14
namespace Envoy {
15
namespace Config {
16

            
17
/**
18
 * Adapter from typed Subscription to untyped GrpcMux. Also handles per-xDS API stats/logging.
19
 */
20
class GrpcSubscriptionImpl : public Subscription,
21
                             protected SubscriptionCallbacks,
22
                             Logger::Loggable<Logger::Id::config> {
23
public:
24
  GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux, SubscriptionCallbacks& callbacks,
25
                       OpaqueResourceDecoderSharedPtr resource_decoder, SubscriptionStats stats,
26
                       absl::string_view type_url, Event::Dispatcher& dispatcher,
27
                       std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
28
                       const SubscriptionOptions& options);
29

            
30
  // Config::Subscription
31
  void start(const absl::flat_hash_set<std::string>& resource_names) override;
32
  void
33
  updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) override;
34
  void requestOnDemandUpdate(const absl::flat_hash_set<std::string>& add_these_names) override;
35
  // Config::SubscriptionCallbacks (all pass through to callbacks_!)
36
  absl::Status onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
37
                              const std::string& version_info) override;
38
  absl::Status onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
39
                              const Protobuf::RepeatedPtrField<std::string>& removed_resources,
40
                              const std::string& system_version_info) override;
41
  void onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) override;
42

            
43
8
  GrpcMuxSharedPtr grpcMux() { return grpc_mux_; }
44

            
45
  ScopedResume pause();
46

            
47
private:
48
  void disableInitFetchTimeoutTimer();
49

            
50
  GrpcMuxSharedPtr grpc_mux_;
51
  SubscriptionCallbacks& callbacks_;
52
  OpaqueResourceDecoderSharedPtr resource_decoder_;
53
  SubscriptionStats stats_;
54
  const std::string type_url_;
55
  GrpcMuxWatchPtr watch_;
56
  Event::Dispatcher& dispatcher_;
57
  // NOTE: if another subscription of the same type_url has already been started, this value will be
58
  // ignored in favor of the other subscription's.
59
  std::chrono::milliseconds init_fetch_timeout_;
60
  Event::TimerPtr init_fetch_timeout_timer_;
61
  const bool is_aggregated_;
62
  const SubscriptionOptions options_;
63

            
64
  struct ResourceNameFormatter {
65
8
    void operator()(std::string* out, const Config::DecodedResourceRef& resource) {
66
8
      out->append(resource.get().name());
67
8
    }
68
  };
69
};
70

            
71
using GrpcSubscriptionImplPtr = std::unique_ptr<GrpcSubscriptionImpl>;
72
using GrpcSubscriptionImplSharedPtr = std::shared_ptr<GrpcSubscriptionImpl>;
73

            
74
class GrpcCollectionSubscriptionImpl : public GrpcSubscriptionImpl {
75
public:
76
  GrpcCollectionSubscriptionImpl(const xds::core::v3::ResourceLocator& collection_locator,
77
                                 GrpcMuxSharedPtr grpc_mux, SubscriptionCallbacks& callbacks,
78
                                 OpaqueResourceDecoderSharedPtr resource_decoder,
79
                                 SubscriptionStats stats, Event::Dispatcher& dispatcher,
80
                                 std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
81
                                 const SubscriptionOptions& options);
82

            
83
  void start(const absl::flat_hash_set<std::string>& resource_names) override;
84

            
85
private:
86
  xds::core::v3::ResourceLocator collection_locator_;
87
};
88

            
89
} // namespace Config
90
} // namespace Envoy