Line data Source code
1 : #include "source/extensions/clusters/original_dst/original_dst_cluster.h"
2 :
3 : #include <chrono>
4 : #include <list>
5 : #include <string>
6 : #include <vector>
7 :
8 : #include "envoy/config/cluster/v3/cluster.pb.h"
9 : #include "envoy/config/core/v3/base.pb.h"
10 : #include "envoy/config/core/v3/health_check.pb.h"
11 : #include "envoy/config/endpoint/v3/endpoint_components.pb.h"
12 : #include "envoy/stats/scope.h"
13 :
14 : #include "source/common/http/headers.h"
15 : #include "source/common/network/address_impl.h"
16 : #include "source/common/network/filter_state_dst_address.h"
17 : #include "source/common/network/utility.h"
18 : #include "source/common/protobuf/protobuf.h"
19 : #include "source/common/protobuf/utility.h"
20 : #include "source/common/runtime/runtime_features.h"
21 :
22 : namespace Envoy {
23 : namespace Upstream {
24 :
25 0 : OriginalDstClusterHandle::~OriginalDstClusterHandle() {
26 0 : std::shared_ptr<OriginalDstCluster> cluster = std::move(cluster_);
27 0 : cluster_.reset();
28 0 : Event::Dispatcher& dispatcher = cluster->dispatcher_;
29 0 : dispatcher.post([cluster = std::move(cluster)]() mutable { cluster.reset(); });
30 0 : }
31 :
32 0 : HostConstSharedPtr OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerContext* context) {
33 0 : if (context) {
34 : // Check if filter state override is present, if yes use it before anything else.
35 0 : Network::Address::InstanceConstSharedPtr dst_host = filterStateOverrideHost(context);
36 :
37 : // Check if override host metadata is present, if yes use it otherwise check the header.
38 0 : if (dst_host == nullptr) {
39 0 : dst_host = metadataOverrideHost(context);
40 0 : }
41 :
42 : // Check if override host header is present, if yes use it otherwise check local address.
43 0 : if (dst_host == nullptr) {
44 0 : dst_host = requestOverrideHost(context);
45 0 : }
46 :
47 0 : if (dst_host == nullptr) {
48 0 : const Network::Connection* connection = context->downstreamConnection();
49 : // The local address of the downstream connection is the original destination address,
50 : // if localAddressRestored() returns 'true'.
51 0 : if (connection && connection->connectionInfoProvider().localAddressRestored()) {
52 0 : dst_host = connection->connectionInfoProvider().localAddress();
53 0 : }
54 0 : }
55 0 : if (dst_host && port_override_.has_value()) {
56 0 : dst_host = Network::Utility::getAddressWithPort(*dst_host.get(), port_override_.value());
57 0 : }
58 :
59 0 : if (dst_host) {
60 0 : const Network::Address::Instance& dst_addr = *dst_host.get();
61 : // Check if a host with the destination address is already in the host set.
62 0 : auto it = host_map_->find(dst_addr.asString());
63 0 : if (it != host_map_->end()) {
64 0 : HostConstSharedPtr host = it->second->host_;
65 0 : ENVOY_LOG(trace, "Using existing host {} {}.", *host, host->address()->asString());
66 0 : it->second->used_ = true;
67 0 : return host;
68 0 : }
69 : // Add a new host
70 0 : const Network::Address::Ip* dst_ip = dst_addr.ip();
71 0 : if (dst_ip) {
72 0 : Network::Address::InstanceConstSharedPtr host_ip_port(
73 0 : Network::Utility::copyInternetAddressAndPort(*dst_ip));
74 : // Create a host we can use immediately.
75 0 : auto info = parent_->cluster_->info();
76 0 : HostSharedPtr host(std::make_shared<HostImpl>(
77 0 : info, info->name() + dst_addr.asString(), std::move(host_ip_port), nullptr, 1,
78 0 : envoy::config::core::v3::Locality().default_instance(),
79 0 : envoy::config::endpoint::v3::Endpoint::HealthCheckConfig().default_instance(), 0,
80 0 : envoy::config::core::v3::UNKNOWN, parent_->cluster_->time_source_));
81 0 : ENVOY_LOG(debug, "Created host {} {}.", *host, host->address()->asString());
82 :
83 : // Tell the cluster about the new host
84 : // lambda cannot capture a member by value.
85 0 : std::weak_ptr<OriginalDstClusterHandle> post_parent = parent_;
86 0 : parent_->cluster_->dispatcher_.post([post_parent, host]() mutable {
87 : // The main cluster may have disappeared while this post was queued.
88 0 : if (std::shared_ptr<OriginalDstClusterHandle> parent = post_parent.lock()) {
89 0 : parent->cluster_->addHost(host);
90 0 : }
91 0 : });
92 0 : return host;
93 0 : } else {
94 0 : ENVOY_LOG(debug, "Failed to create host for {}.", dst_addr.asString());
95 0 : }
96 0 : }
97 0 : }
98 : // TODO(ramaraochavali): add a stat and move this log line to debug.
99 0 : ENVOY_LOG(warn, "original_dst_load_balancer: No downstream connection or no original_dst.");
100 0 : return nullptr;
101 0 : }
102 :
103 : Network::Address::InstanceConstSharedPtr
104 0 : OriginalDstCluster::LoadBalancer::filterStateOverrideHost(LoadBalancerContext* context) {
105 0 : const auto streamInfos = {
106 0 : context->requestStreamInfo(),
107 0 : context->downstreamConnection() ? &context->downstreamConnection()->streamInfo() : nullptr};
108 0 : for (const auto streamInfo : streamInfos) {
109 0 : if (streamInfo == nullptr) {
110 0 : continue;
111 0 : }
112 0 : const auto* dst_address = streamInfo->filterState().getDataReadOnly<Network::AddressObject>(
113 0 : OriginalDstClusterFilterStateKey);
114 0 : if (dst_address) {
115 0 : return dst_address->address();
116 0 : }
117 0 : }
118 0 : return nullptr;
119 0 : }
120 :
121 : Network::Address::InstanceConstSharedPtr
122 0 : OriginalDstCluster::LoadBalancer::requestOverrideHost(LoadBalancerContext* context) {
123 0 : if (!http_header_name_.has_value()) {
124 0 : return nullptr;
125 0 : }
126 0 : const Http::HeaderMap* downstream_headers = context->downstreamHeaders();
127 0 : if (!downstream_headers) {
128 0 : return nullptr;
129 0 : }
130 0 : Http::HeaderMap::GetResult override_header = downstream_headers->get(*http_header_name_);
131 0 : if (override_header.empty()) {
132 0 : return nullptr;
133 0 : }
134 : // This is an implicitly untrusted header, so per the API documentation only the first
135 : // value is used.
136 0 : const std::string request_override_host(override_header[0]->value().getStringView());
137 0 : Network::Address::InstanceConstSharedPtr request_host =
138 0 : Network::Utility::parseInternetAddressAndPortNoThrow(request_override_host, false);
139 0 : if (request_host == nullptr) {
140 0 : ENVOY_LOG(debug, "original_dst_load_balancer: invalid override header value. {}",
141 0 : request_override_host);
142 0 : parent_->cluster_->info()->trafficStats()->original_dst_host_invalid_.inc();
143 0 : return nullptr;
144 0 : }
145 0 : ENVOY_LOG(debug, "Using request override host {}.", request_override_host);
146 0 : return request_host;
147 0 : }
148 :
149 : Network::Address::InstanceConstSharedPtr
150 0 : OriginalDstCluster::LoadBalancer::metadataOverrideHost(LoadBalancerContext* context) {
151 0 : if (!metadata_key_.has_value()) {
152 0 : return nullptr;
153 0 : }
154 0 : const auto streamInfos = {
155 0 : context->requestStreamInfo(),
156 0 : context->downstreamConnection() ? &context->downstreamConnection()->streamInfo() : nullptr};
157 0 : const ProtobufWkt::Value* value = nullptr;
158 0 : for (const auto streamInfo : streamInfos) {
159 0 : if (streamInfo == nullptr) {
160 0 : continue;
161 0 : }
162 0 : const auto& metadata = streamInfo->dynamicMetadata();
163 0 : value = &Config::Metadata::metadataValue(&metadata, metadata_key_.value());
164 : // Path can refer to a list, in which case we extract the first element.
165 0 : if (value->kind_case() == ProtobufWkt::Value::kListValue) {
166 0 : const auto& values = value->list_value().values();
167 0 : if (!values.empty()) {
168 0 : value = &(values[0]);
169 0 : }
170 0 : }
171 0 : if (value->kind_case() == ProtobufWkt::Value::kStringValue) {
172 0 : break;
173 0 : }
174 0 : }
175 0 : if (value == nullptr || value->kind_case() != ProtobufWkt::Value::kStringValue) {
176 0 : return nullptr;
177 0 : }
178 0 : const std::string& metadata_override_host = value->string_value();
179 0 : Network::Address::InstanceConstSharedPtr metadata_host =
180 0 : Network::Utility::parseInternetAddressAndPortNoThrow(metadata_override_host, false);
181 0 : if (metadata_host == nullptr) {
182 0 : ENVOY_LOG(debug, "original_dst_load_balancer: invalid override metadata value. {}",
183 0 : metadata_override_host);
184 0 : parent_->cluster_->info()->trafficStats()->original_dst_host_invalid_.inc();
185 0 : return nullptr;
186 0 : }
187 0 : ENVOY_LOG(debug, "Using metadata override host {}.", metadata_override_host);
188 0 : return metadata_host;
189 0 : }
190 :
191 : OriginalDstCluster::OriginalDstCluster(const envoy::config::cluster::v3::Cluster& config,
192 : ClusterFactoryContext& context)
193 : : ClusterImplBase(config, context),
194 : dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
195 : cleanup_interval_ms_(
196 : std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, cleanup_interval, 5000))),
197 0 : cleanup_timer_(dispatcher_.createTimer([this]() -> void { cleanup(); })),
198 0 : host_map_(std::make_shared<HostMultiMap>()) {
199 0 : if (const auto& config_opt = info_->lbOriginalDstConfig(); config_opt.has_value()) {
200 0 : if (config_opt->use_http_header()) {
201 0 : http_header_name_ = config_opt->http_header_name().empty()
202 0 : ? Http::Headers::get().EnvoyOriginalDstHost
203 0 : : Http::LowerCaseString(config_opt->http_header_name());
204 0 : }
205 0 : if (config_opt->has_metadata_key()) {
206 0 : metadata_key_ = Config::MetadataKey(config_opt->metadata_key());
207 0 : }
208 0 : if (config_opt->has_upstream_port_override()) {
209 0 : port_override_ = config_opt->upstream_port_override().value();
210 0 : }
211 0 : }
212 0 : cleanup_timer_->enableTimer(cleanup_interval_ms_);
213 0 : }
214 :
215 0 : void OriginalDstCluster::addHost(HostSharedPtr& host) {
216 0 : std::string address = host->address()->asString();
217 0 : HostMultiMapSharedPtr new_host_map = std::make_shared<HostMultiMap>(*getCurrentHostMap());
218 0 : auto it = new_host_map->find(address);
219 0 : if (it != new_host_map->end()) {
220 : // If the entry already exists, that means the worker that posted this host
221 : // had a stale host map. Because the host is potentially in that worker's
222 : // connection pools, we save the host in the host map hosts_ list and the
223 : // cluster priority set. Subsequently, the entire hosts_ list and the
224 : // primary host are removed collectively, once no longer in use.
225 0 : it->second->hosts_.push_back(host);
226 0 : } else {
227 : // The first worker that creates a host for the address defines the primary
228 : // host structure.
229 0 : new_host_map->emplace(address, std::make_shared<HostsForAddress>(host));
230 0 : }
231 0 : ENVOY_LOG(debug, "addHost() adding {} {}.", *host, address);
232 0 : setHostMap(new_host_map);
233 :
234 : // Given the current config, only EDS clusters support multiple priorities.
235 0 : ASSERT(priority_set_.hostSetsPerPriority().size() == 1);
236 0 : const auto& first_host_set = priority_set_.getOrCreateHostSet(0);
237 0 : HostVectorSharedPtr all_hosts(new HostVector(first_host_set.hosts()));
238 0 : all_hosts->emplace_back(host);
239 0 : priority_set_.updateHosts(0,
240 0 : HostSetImpl::partitionHosts(all_hosts, HostsPerLocalityImpl::empty()),
241 0 : {}, {std::move(host)}, {}, absl::nullopt, absl::nullopt);
242 0 : }
243 :
244 0 : void OriginalDstCluster::cleanup() {
245 0 : HostVectorSharedPtr keeping_hosts(new HostVector);
246 0 : HostVector to_be_removed;
247 0 : absl::flat_hash_set<absl::string_view> removed_addresses;
248 0 : auto host_map = getCurrentHostMap();
249 0 : if (!host_map->empty()) {
250 0 : ENVOY_LOG(trace, "Cleaning up stale original dst hosts.");
251 0 : for (const auto& [addr, hosts] : *host_map) {
252 : // Address is kept in the cluster if either of the two things happen:
253 : // 1) a host has been recently selected for the address; 2) none of the
254 : // hosts are currently in any of the connection pools.
255 : // The set of hosts for a single address are treated as a unit.
256 : //
257 : // Using the used_ bit is preserved for backwards compatibility and to
258 : // add a delay between load balancers choosing a host and grabbing a
259 : // handle on the host. This prevents the following interleaving:
260 : //
261 : // 1) worker 1: pools release host h
262 : // 2) worker 1: auto h = lb.chooseHost(&ctx);
263 : // 3) main: cleanup() // deletes h because h is not used by the pools
264 : // 4) worker 1: auto handle = h.acquireHandle();
265 : //
266 : // Because the duration between steps 2) and 4) is O(instructions), step
267 : // 3) will not delete h since it takes at least one cleanup_interval for
268 : // the host to set used_ bit for h to false.
269 0 : bool keep = false;
270 0 : if (hosts->used_) {
271 0 : keep = true;
272 0 : hosts->used_ = false; // Mark to be removed during the next round.
273 0 : } else if (Runtime::runtimeFeatureEnabled(
274 0 : "envoy.reloadable_features.original_dst_rely_on_idle_timeout")) {
275 : // Check that all hosts (first, as well as others that may have been added concurrently)
276 : // are not in use by any connection pool.
277 0 : if (hosts->host_->used()) {
278 0 : keep = true;
279 0 : } else {
280 0 : for (const auto& host : hosts->hosts_) {
281 0 : if (host->used()) {
282 0 : keep = true;
283 0 : break;
284 0 : }
285 0 : }
286 0 : }
287 0 : }
288 0 : if (keep) {
289 0 : ENVOY_LOG(trace, "Keeping active address {}.", addr);
290 0 : keeping_hosts->emplace_back(hosts->host_);
291 0 : if (!hosts->hosts_.empty()) {
292 0 : keeping_hosts->insert(keeping_hosts->end(), hosts->hosts_.begin(), hosts->hosts_.end());
293 0 : }
294 0 : } else {
295 0 : ENVOY_LOG(trace, "Removing stale address {}.", addr);
296 0 : removed_addresses.insert(addr);
297 0 : to_be_removed.emplace_back(hosts->host_);
298 0 : if (!hosts->hosts_.empty()) {
299 0 : to_be_removed.insert(to_be_removed.end(), hosts->hosts_.begin(), hosts->hosts_.end());
300 0 : }
301 0 : }
302 0 : }
303 0 : }
304 0 : if (!to_be_removed.empty()) {
305 0 : HostMultiMapSharedPtr new_host_map = std::make_shared<HostMultiMap>(*host_map);
306 0 : for (const auto& addr : removed_addresses) {
307 0 : new_host_map->erase(addr);
308 0 : }
309 0 : setHostMap(new_host_map);
310 0 : priority_set_.updateHosts(
311 0 : 0, HostSetImpl::partitionHosts(keeping_hosts, HostsPerLocalityImpl::empty()), {}, {},
312 0 : to_be_removed, false, absl::nullopt);
313 0 : }
314 :
315 0 : cleanup_timer_->enableTimer(cleanup_interval_ms_);
316 0 : }
317 :
318 : absl::StatusOr<std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>>
319 : OriginalDstClusterFactory::createClusterImpl(const envoy::config::cluster::v3::Cluster& cluster,
320 0 : ClusterFactoryContext& context) {
321 0 : if (cluster.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
322 0 : return absl::InvalidArgumentError(
323 0 : fmt::format("cluster: LB policy {} is not valid for Cluster type {}. Only "
324 0 : "'CLUSTER_PROVIDED' is allowed with cluster type 'ORIGINAL_DST'",
325 0 : envoy::config::cluster::v3::Cluster::LbPolicy_Name(cluster.lb_policy()),
326 0 : envoy::config::cluster::v3::Cluster::DiscoveryType_Name(cluster.type())));
327 0 : }
328 :
329 0 : if (cluster.has_load_assignment()) {
330 0 : return absl::InvalidArgumentError(
331 0 : "ORIGINAL_DST clusters must have no load assignment configured");
332 0 : }
333 :
334 0 : if (!cluster.original_dst_lb_config().use_http_header() &&
335 0 : !cluster.original_dst_lb_config().http_header_name().empty()) {
336 0 : return absl::InvalidArgumentError(fmt::format(
337 0 : "ORIGINAL_DST cluster: invalid config http_header_name={} and use_http_header is "
338 0 : "false. Set use_http_header to true if http_header_name is desired.",
339 0 : cluster.original_dst_lb_config().http_header_name()));
340 0 : }
341 :
342 : // TODO(mattklein123): The original DST load balancer type should be deprecated and instead
343 : // the cluster should directly supply the load balancer. This will remove
344 : // a special case and allow this cluster to be compiled out as an extension.
345 0 : auto new_cluster = std::shared_ptr<OriginalDstCluster>(new OriginalDstCluster(cluster, context));
346 0 : auto lb = std::make_unique<OriginalDstCluster::ThreadAwareLoadBalancer>(
347 0 : std::make_shared<OriginalDstClusterHandle>(new_cluster));
348 0 : return std::make_pair(new_cluster, std::move(lb));
349 0 : }
350 :
351 : /**
352 : * Static registration for the original dst cluster factory. @see RegisterFactory.
353 : */
354 : REGISTER_FACTORY(OriginalDstClusterFactory, ClusterFactory);
355 :
356 : class OriginalDstClusterFilterStateFactory : public Network::BaseAddressObjectFactory {
357 : public:
358 2 : std::string name() const override { return std::string(OriginalDstClusterFilterStateKey); }
359 : };
360 :
361 : REGISTER_FACTORY(OriginalDstClusterFilterStateFactory, StreamInfo::FilterState::ObjectFactory);
362 :
363 : } // namespace Upstream
364 : } // namespace Envoy
|