Line data Source code
1 : #pragma once 2 : 3 : #include <chrono> 4 : #include <memory> 5 : 6 : #include "envoy/config/grpc_mux.h" 7 : #include "envoy/config/subscription.h" 8 : #include "envoy/event/dispatcher.h" 9 : 10 : #include "source/common/common/logger.h" 11 : 12 : #include "xds/core/v3/resource_locator.pb.h" 13 : 14 : namespace Envoy { 15 : namespace Config { 16 : 17 : /** 18 : * Adapter from typed Subscription to untyped GrpcMux. Also handles per-xDS API stats/logging. 19 : */ 20 : class GrpcSubscriptionImpl : public Subscription, 21 : protected SubscriptionCallbacks, 22 : Logger::Loggable<Logger::Id::config> { 23 : public: 24 : GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux, SubscriptionCallbacks& callbacks, 25 : OpaqueResourceDecoderSharedPtr resource_decoder, SubscriptionStats stats, 26 : absl::string_view type_url, Event::Dispatcher& dispatcher, 27 : std::chrono::milliseconds init_fetch_timeout, bool is_aggregated, 28 : const SubscriptionOptions& options); 29 : 30 : // Config::Subscription 31 : void start(const absl::flat_hash_set<std::string>& resource_names) override; 32 : void 33 : updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) override; 34 : void requestOnDemandUpdate(const absl::flat_hash_set<std::string>& add_these_names) override; 35 : // Config::SubscriptionCallbacks (all pass through to callbacks_!) 36 : absl::Status onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources, 37 : const std::string& version_info) override; 38 : absl::Status onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources, 39 : const Protobuf::RepeatedPtrField<std::string>& removed_resources, 40 : const std::string& system_version_info) override; 41 : void onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) override; 42 : 43 0 : GrpcMuxSharedPtr grpcMux() { return grpc_mux_; } 44 : 45 : ScopedResume pause(); 46 : 47 : private: 48 : void disableInitFetchTimeoutTimer(); 49 : 50 : GrpcMuxSharedPtr grpc_mux_; 51 : SubscriptionCallbacks& callbacks_; 52 : OpaqueResourceDecoderSharedPtr resource_decoder_; 53 : SubscriptionStats stats_; 54 : const std::string type_url_; 55 : GrpcMuxWatchPtr watch_; 56 : Event::Dispatcher& dispatcher_; 57 : // NOTE: if another subscription of the same type_url has already been started, this value will be 58 : // ignored in favor of the other subscription's. 59 : std::chrono::milliseconds init_fetch_timeout_; 60 : Event::TimerPtr init_fetch_timeout_timer_; 61 : const bool is_aggregated_; 62 : const SubscriptionOptions options_; 63 : 64 : struct ResourceNameFormatter { 65 0 : void operator()(std::string* out, const Config::DecodedResourceRef& resource) { 66 0 : out->append(resource.get().name()); 67 0 : } 68 : }; 69 : }; 70 : 71 : using GrpcSubscriptionImplPtr = std::unique_ptr<GrpcSubscriptionImpl>; 72 : using GrpcSubscriptionImplSharedPtr = std::shared_ptr<GrpcSubscriptionImpl>; 73 : 74 : class GrpcCollectionSubscriptionImpl : public GrpcSubscriptionImpl { 75 : public: 76 : GrpcCollectionSubscriptionImpl(const xds::core::v3::ResourceLocator& collection_locator, 77 : GrpcMuxSharedPtr grpc_mux, SubscriptionCallbacks& callbacks, 78 : OpaqueResourceDecoderSharedPtr resource_decoder, 79 : SubscriptionStats stats, Event::Dispatcher& dispatcher, 80 : std::chrono::milliseconds init_fetch_timeout, bool is_aggregated, 81 : const SubscriptionOptions& options); 82 : 83 : void start(const absl::flat_hash_set<std::string>& resource_names) override; 84 : 85 : private: 86 : xds::core::v3::ResourceLocator collection_locator_; 87 : }; 88 : 89 : } // namespace Config 90 : } // namespace Envoy