1
#include "source/extensions/config_subscription/rest/http_subscription_impl.h"
2

            
3
#include <memory>
4

            
5
#include "envoy/service/discovery/v3/discovery.pb.h"
6

            
7
#include "source/common/buffer/buffer_impl.h"
8
#include "source/common/common/assert.h"
9
#include "source/common/common/macros.h"
10
#include "source/common/common/utility.h"
11
#include "source/common/config/decoded_resource_impl.h"
12
#include "source/common/config/utility.h"
13
#include "source/common/http/headers.h"
14
#include "source/common/protobuf/protobuf.h"
15
#include "source/common/protobuf/utility.h"
16

            
17
#include "google/api/annotations.pb.h"
18

            
19
namespace Envoy {
20
namespace Config {
21

            
22
HttpSubscriptionImpl::HttpSubscriptionImpl(
23
    const LocalInfo::LocalInfo& local_info, Upstream::ClusterManager& cm,
24
    const std::string& remote_cluster_name, Event::Dispatcher& dispatcher,
25
    Random::RandomGenerator& random, std::chrono::milliseconds refresh_interval,
26
    std::chrono::milliseconds request_timeout, const Protobuf::MethodDescriptor& service_method,
27
    absl::string_view type_url, SubscriptionCallbacks& callbacks,
28
    OpaqueResourceDecoderSharedPtr resource_decoder, SubscriptionStats stats,
29
    std::chrono::milliseconds init_fetch_timeout,
30
    ProtobufMessage::ValidationVisitor& validation_visitor)
31
18
    : Http::RestApiFetcher(cm, remote_cluster_name, dispatcher, random, refresh_interval,
32
18
                           request_timeout),
33
18
      callbacks_(callbacks), resource_decoder_(resource_decoder), stats_(stats),
34
18
      dispatcher_(dispatcher), init_fetch_timeout_(init_fetch_timeout),
35
18
      validation_visitor_(validation_visitor) {
36
18
  request_.mutable_node()->CopyFrom(local_info.node());
37
18
  request_.set_type_url(std::string(type_url));
38
18
  ASSERT(service_method.options().HasExtension(google::api::http));
39
18
  const auto& http_rule = service_method.options().GetExtension(google::api::http);
40
18
  path_ = http_rule.post();
41
18
  ASSERT(http_rule.body() == "*");
42
18
}
43

            
44
// Config::Subscription
45
17
void HttpSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) {
46
17
  if (init_fetch_timeout_.count() > 0) {
47
5
    init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
48
1
      handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
49
1
    });
50
5
    init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
51
5
  }
52

            
53
17
  Protobuf::RepeatedPtrField<std::string> resources_vector(resource_names.begin(),
54
17
                                                           resource_names.end());
55
  // Sort to provide stable wire ordering.
56
17
  std::sort(resources_vector.begin(), resources_vector.end());
57
17
  request_.mutable_resource_names()->Swap(&resources_vector);
58
17
  initialize();
59
17
}
60

            
61
void HttpSubscriptionImpl::updateResourceInterest(
62
1
    const absl::flat_hash_set<std::string>& update_to_these_names) {
63
1
  Protobuf::RepeatedPtrField<std::string> resources_vector(update_to_these_names.begin(),
64
1
                                                           update_to_these_names.end());
65
  // Sort to provide stable wire ordering.
66
1
  std::sort(resources_vector.begin(), resources_vector.end());
67
1
  request_.mutable_resource_names()->Swap(&resources_vector);
68
1
}
69

            
70
// Http::RestApiFetcher
71
39
void HttpSubscriptionImpl::createRequest(Http::RequestMessage& request) {
72
39
  ENVOY_LOG(debug, "Sending REST request for {}", path_);
73
39
  stats_.update_attempt_.inc();
74
39
  request.headers().setReferenceMethod(Http::Headers::get().MethodValues.Post);
75
39
  request.headers().setPath(path_);
76
39
  request.body().add(MessageUtil::getJsonStringFromMessageOrError(request_));
77
39
  request.headers().setReferenceContentType(Http::Headers::get().ContentTypeValues.Json);
78
39
  request.headers().setContentLength(request.body().length());
79
39
}
80

            
81
18
void HttpSubscriptionImpl::parseResponse(const Http::ResponseMessage& response) {
82
18
  disableInitFetchTimeoutTimer();
83
18
  envoy::service::discovery::v3::DiscoveryResponse message;
84
18
  TRY_ASSERT_MAIN_THREAD {
85
18
    MessageUtil::loadFromJson(response.bodyAsString(), message, validation_visitor_);
86
18
  }
87
18
  END_TRY
88
18
  catch (const EnvoyException& e) {
89
1
    handleFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e);
90
1
    return;
91
1
  }
92
17
  TRY_ASSERT_MAIN_THREAD {
93
17
    const auto decoded_resources =
94
17
        THROW_OR_RETURN_VALUE(DecodedResourcesWrapper::create(
95
17
                                  *resource_decoder_, message.resources(), message.version_info()),
96
17
                              std::unique_ptr<DecodedResourcesWrapper>);
97
17
    THROW_IF_NOT_OK(callbacks_.onConfigUpdate(decoded_resources->refvec_, message.version_info()));
98
17
    request_.set_version_info(message.version_info());
99
17
    stats_.update_time_.set(DateUtil::nowToMilliseconds(dispatcher_.timeSource()));
100
17
    stats_.version_.set(HashUtil::xxHash64(request_.version_info()));
101
17
    stats_.version_text_.set(request_.version_info());
102
17
    stats_.update_success_.inc();
103
17
  }
104
17
  END_TRY
105
17
  catch (const EnvoyException& e) {
106
5
    handleFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e);
107
5
  }
108
17
}
109

            
110
21
void HttpSubscriptionImpl::onFetchComplete() {}
111

            
112
void HttpSubscriptionImpl::onFetchFailure(Config::ConfigUpdateFailureReason reason,
113
2
                                          const EnvoyException* e) {
114
2
  handleFailure(reason, e);
115
2
}
116

            
117
void HttpSubscriptionImpl::handleFailure(Config::ConfigUpdateFailureReason reason,
118
9
                                         const EnvoyException* e) {
119

            
120
9
  switch (reason) {
121
2
  case Config::ConfigUpdateFailureReason::ConnectionFailure:
122
2
    ENVOY_LOG(warn, "REST update for {} failed", path_);
123
2
    stats_.update_failure_.inc();
124
2
    break;
125
1
  case Config::ConfigUpdateFailureReason::FetchTimedout:
126
1
    ENVOY_LOG(warn, "REST config: initial fetch timeout for {}", path_);
127
1
    stats_.init_fetch_timeout_.inc();
128
1
    disableInitFetchTimeoutTimer();
129
1
    break;
130
6
  case Config::ConfigUpdateFailureReason::UpdateRejected:
131
6
    ASSERT(e != nullptr);
132
6
    ENVOY_LOG(warn, "REST config for {} rejected: {}", path_, e->what());
133
6
    stats_.update_rejected_.inc();
134
6
    disableInitFetchTimeoutTimer();
135
6
    break;
136
9
  }
137

            
138
9
  if (reason == Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure) {
139
    // New requests will be sent again.
140
    // If init_fetch_timeout is non-zero, server will continue startup after it timeout
141
2
    return;
142
2
  }
143

            
144
7
  callbacks_.onConfigUpdateFailed(reason, e);
145
7
}
146

            
147
25
void HttpSubscriptionImpl::disableInitFetchTimeoutTimer() {
148
25
  if (init_fetch_timeout_timer_) {
149
3
    init_fetch_timeout_timer_->disableTimer();
150
3
    init_fetch_timeout_timer_.reset();
151
3
  }
152
25
}
153

            
154
std::chrono::milliseconds HttpSubscriptionFactory::apiConfigSourceRefreshDelay(
155
4
    const envoy::config::core::v3::ApiConfigSource& api_config_source) {
156
4
  if (!api_config_source.has_refresh_delay()) {
157
1
    throw EnvoyException("refresh_delay is required for REST API configuration sources");
158
1
  }
159

            
160
3
  return std::chrono::milliseconds(
161
3
      DurationUtil::durationToMilliseconds(api_config_source.refresh_delay()));
162
4
}
163

            
164
std::chrono::milliseconds HttpSubscriptionFactory::apiConfigSourceRequestTimeout(
165
3
    const envoy::config::core::v3::ApiConfigSource& api_config_source) {
166
3
  return std::chrono::milliseconds(
167
3
      PROTOBUF_GET_MS_OR_DEFAULT(api_config_source, request_timeout, 1000));
168
3
}
169

            
170
REGISTER_FACTORY(HttpSubscriptionFactory, ConfigSubscriptionFactory);
171

            
172
} // namespace Config
173
} // namespace Envoy