1
#pragma once
2

            
3
#include <memory>
4
#include <string>
5

            
6
#include "envoy/common/pure.h"
7
#include "envoy/config/subscription.h"
8
#include "envoy/config/xds_config_tracker.h"
9
#include "envoy/config/xds_resources_delegate.h"
10
#include "envoy/event/dispatcher.h"
11
#include "envoy/service/discovery/v3/discovery.pb.h"
12

            
13
#include "source/common/config/ttl.h"
14
#include "source/common/config/utility.h"
15
#include "source/common/protobuf/protobuf.h"
16
#include "source/extensions/config_subscription/grpc/update_ack.h"
17

            
18
#include "absl/strings/string_view.h"
19

            
20
namespace Envoy {
21
namespace Config {
22
namespace XdsMux {
23

            
24
class SubscriptionState {};
25

            
26
// Tracks the protocol state of an individual ongoing xDS-over-gRPC session, for a single type_url.
27
// There can be multiple SubscriptionStates active, one per type_url. They will all be
28
// blissfully unaware of each other's existence, even when their messages are being multiplexed
29
// together by ADS.
30
// This is the abstract parent class for both the delta and state-of-the-world xDS variants.
31
template <class RS, class RQ>
32
class BaseSubscriptionState : public SubscriptionState,
33
                              public Logger::Loggable<Logger::Id::config> {
34
public:
35
  // Note that, outside of tests, we expect callbacks to always be a WatchMap.
36
  BaseSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& callbacks,
37
                        Event::Dispatcher& dispatcher,
38
                        XdsConfigTrackerOptRef xds_config_tracker = absl::nullopt,
39
                        XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt,
40
                        const std::string& target_xds_authority = "")
41
462
      : ttl_([this](const std::vector<std::string>& expired) { ttlExpiryCallback(expired); },
42
462
             dispatcher, dispatcher.timeSource()),
43
462
        type_url_(std::move(type_url)), callbacks_(callbacks), dispatcher_(dispatcher),
44
462
        xds_config_tracker_(xds_config_tracker), xds_resources_delegate_(xds_resources_delegate),
45
462
        target_xds_authority_(target_xds_authority) {}
46

            
47
462
  virtual ~BaseSubscriptionState() = default;
48

            
49
  // Update which resources we're interested in subscribing to.
50
  virtual void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added,
51
                                          const absl::flat_hash_set<std::string>& cur_removed) PURE;
52

            
53
10
  void setDynamicContextChanged() { dynamic_context_changed_ = true; }
54
2308
  void clearDynamicContextChanged() { dynamic_context_changed_ = false; }
55
7599
  bool dynamicContextChanged() const { return dynamic_context_changed_; }
56

            
57
9
  void setControlPlaneIdentifier(const std::string& id) { control_plane_identifier_ = id; }
58
19
  std::string& controlPlaneIdentifier() { return control_plane_identifier_; }
59

            
60
  // Whether there was a change in our subscription interest we have yet to inform the server of.
61
  virtual bool subscriptionUpdatePending() const PURE;
62

            
63
  virtual void markStreamFresh(bool should_send_initial_resource_versions) PURE;
64

            
65
  // May modify the order of the resources in response_proto to put all the
66
  // non-heartbeat resources first.
67
1483
  UpdateAck handleResponse(RS& response) {
68
    // We *always* copy the response's nonce into the next request, even if we're going to make that
69
    // request a NACK by setting error_detail.
70
1483
    UpdateAck ack(response.nonce(), typeUrl());
71
1483
    ENVOY_LOG(debug, "Handling response for {}", typeUrl());
72
1483
    TRY_ASSERT_MAIN_THREAD { handleGoodResponse(response); }
73
1483
    END_TRY
74
1483
    CATCH(const EnvoyException& e, {
75
1483
      if (xds_config_tracker_.has_value()) {
76
1483
        xds_config_tracker_->onConfigRejected(response,
77
1483
                                              Config::Utility::truncateGrpcStatusMessage(e.what()));
78
1483
      }
79
1483
      handleBadResponse(e, ack);
80
1483
    });
81
1483
    previously_fetched_data_ = true;
82
1483
    return ack;
83
1483
  }
84

            
85
469
  virtual void handleEstablishmentFailure() {
86
469
    ENVOY_LOG(debug, "SubscriptionState establishment failed for {}", typeUrl());
87
469
    callbacks().onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
88
469
                                     nullptr);
89
469
  }
90

            
91
  // Returns the next gRPC request proto to be sent off to the server, based on this object's
92
  // understanding of the current protocol state, and new resources that Envoy wants to request.
93
1043
  std::unique_ptr<RQ> getNextRequestAckless() { return getNextRequestInternal(); }
94

            
95
  // The WithAck version first calls the ack-less version, then adds in the passed-in ack.
96
  // Returns a new'd pointer, meant to be owned by the caller, who is expected to know what type the
97
  // pointer actually is.
98
1331
  std::unique_ptr<RQ> getNextRequestWithAck(const UpdateAck& ack) {
99
1331
    auto request = getNextRequestInternal();
100
1331
    request->set_response_nonce(ack.nonce_);
101
1331
    ENVOY_LOG(debug, "ACK for {} will have nonce {}", typeUrl(), ack.nonce_);
102
1331
    if (ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
103
      // Don't needlessly make the field present-but-empty if status is ok.
104
21
      request->mutable_error_detail()->CopyFrom(ack.error_detail_);
105
21
    }
106
1331
    return request;
107
1331
  }
108

            
109
  virtual void ttlExpiryCallback(const std::vector<std::string>& type_url) PURE;
110

            
111
protected:
112
  virtual std::unique_ptr<RQ> getNextRequestInternal() PURE;
113
  // May modify the order of the resources in response_proto to put all the
114
  // non-heartbeat resources first.
115
  virtual void handleGoodResponse(RS& message) PURE;
116
37
  void handleBadResponse(const EnvoyException& e, UpdateAck& ack) {
117
    // Note that error_detail being set is what indicates that a (Delta)DiscoveryRequest is a NACK.
118
37
    ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
119
37
    ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
120
37
    ENVOY_LOG(warn, "Config for {} rejected: {}", typeUrl(), e.what());
121
37
    callbacks().onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
122
37
  }
123

            
124
3875
  std::string typeUrl() const { return type_url_; }
125
1978
  UntypedConfigUpdateCallbacks& callbacks() const { return callbacks_; }
126

            
127
  TtlManager ttl_;
128
  const std::string type_url_;
129
  // callbacks_ is expected (outside of tests) to be a WatchMap.
130
  UntypedConfigUpdateCallbacks& callbacks_;
131
  Event::Dispatcher& dispatcher_;
132
  bool dynamic_context_changed_{};
133
  std::string control_plane_identifier_{};
134
  XdsConfigTrackerOptRef xds_config_tracker_;
135
  XdsResourcesDelegateOptRef xds_resources_delegate_;
136
  const std::string target_xds_authority_;
137
  bool previously_fetched_data_{};
138
};
139

            
140
template <class T> class SubscriptionStateFactory {
141
public:
142
259
  virtual ~SubscriptionStateFactory() = default;
143
  // Note that, outside of tests, we expect callbacks to always be a WatchMap.
144
  virtual std::unique_ptr<T>
145
  makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks,
146
                        OpaqueResourceDecoderSharedPtr resource_decoder,
147
                        XdsConfigTrackerOptRef xds_config_tracker = absl::nullopt,
148
                        XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt,
149
                        const std::string& target_xds_authority = "") PURE;
150
};
151

            
152
} // namespace XdsMux
153
} // namespace Config
154
} // namespace Envoy