/proc/self/cwd/source/extensions/clusters/original_dst/original_dst_cluster.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/clusters/original_dst/original_dst_cluster.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <list> |
5 | | #include <string> |
6 | | #include <vector> |
7 | | |
8 | | #include "envoy/config/cluster/v3/cluster.pb.h" |
9 | | #include "envoy/config/core/v3/base.pb.h" |
10 | | #include "envoy/config/core/v3/health_check.pb.h" |
11 | | #include "envoy/config/endpoint/v3/endpoint_components.pb.h" |
12 | | #include "envoy/stats/scope.h" |
13 | | |
14 | | #include "source/common/http/headers.h" |
15 | | #include "source/common/network/address_impl.h" |
16 | | #include "source/common/network/filter_state_dst_address.h" |
17 | | #include "source/common/network/utility.h" |
18 | | #include "source/common/protobuf/protobuf.h" |
19 | | #include "source/common/protobuf/utility.h" |
20 | | #include "source/common/runtime/runtime_features.h" |
21 | | |
22 | | namespace Envoy { |
23 | | namespace Upstream { |
24 | | |
25 | 0 | OriginalDstClusterHandle::~OriginalDstClusterHandle() { |
26 | 0 | std::shared_ptr<OriginalDstCluster> cluster = std::move(cluster_); |
27 | 0 | cluster_.reset(); |
28 | 0 | Event::Dispatcher& dispatcher = cluster->dispatcher_; |
29 | 0 | dispatcher.post([cluster = std::move(cluster)]() mutable { cluster.reset(); }); |
30 | 0 | } |
31 | | |
32 | 0 | HostConstSharedPtr OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerContext* context) { |
33 | 0 | if (context) { |
34 | | // Check if filter state override is present, if yes use it before anything else. |
35 | 0 | Network::Address::InstanceConstSharedPtr dst_host = filterStateOverrideHost(context); |
36 | | |
37 | | // Check if override host metadata is present, if yes use it otherwise check the header. |
38 | 0 | if (dst_host == nullptr) { |
39 | 0 | dst_host = metadataOverrideHost(context); |
40 | 0 | } |
41 | | |
42 | | // Check if override host header is present, if yes use it otherwise check local address. |
43 | 0 | if (dst_host == nullptr) { |
44 | 0 | dst_host = requestOverrideHost(context); |
45 | 0 | } |
46 | |
|
47 | 0 | if (dst_host == nullptr) { |
48 | 0 | const Network::Connection* connection = context->downstreamConnection(); |
49 | | // The local address of the downstream connection is the original destination address, |
50 | | // if localAddressRestored() returns 'true'. |
51 | 0 | if (connection && connection->connectionInfoProvider().localAddressRestored()) { |
52 | 0 | dst_host = connection->connectionInfoProvider().localAddress(); |
53 | 0 | } |
54 | 0 | } |
55 | 0 | if (dst_host && port_override_.has_value()) { |
56 | 0 | dst_host = Network::Utility::getAddressWithPort(*dst_host.get(), port_override_.value()); |
57 | 0 | } |
58 | |
|
59 | 0 | if (dst_host) { |
60 | 0 | const Network::Address::Instance& dst_addr = *dst_host.get(); |
61 | | // Check if a host with the destination address is already in the host set. |
62 | 0 | auto it = host_map_->find(dst_addr.asString()); |
63 | 0 | if (it != host_map_->end()) { |
64 | 0 | HostConstSharedPtr host = it->second->host_; |
65 | 0 | ENVOY_LOG(trace, "Using existing host {} {}.", *host, host->address()->asString()); |
66 | 0 | it->second->used_ = true; |
67 | 0 | return host; |
68 | 0 | } |
69 | | // Add a new host |
70 | 0 | const Network::Address::Ip* dst_ip = dst_addr.ip(); |
71 | 0 | if (dst_ip) { |
72 | 0 | Network::Address::InstanceConstSharedPtr host_ip_port( |
73 | 0 | Network::Utility::copyInternetAddressAndPort(*dst_ip)); |
74 | | // Create a host we can use immediately. |
75 | 0 | auto info = parent_->cluster_->info(); |
76 | 0 | HostSharedPtr host(std::make_shared<HostImpl>( |
77 | 0 | info, info->name() + dst_addr.asString(), std::move(host_ip_port), nullptr, 1, |
78 | 0 | envoy::config::core::v3::Locality().default_instance(), |
79 | 0 | envoy::config::endpoint::v3::Endpoint::HealthCheckConfig().default_instance(), 0, |
80 | 0 | envoy::config::core::v3::UNKNOWN, parent_->cluster_->time_source_)); |
81 | 0 | ENVOY_LOG(debug, "Created host {} {}.", *host, host->address()->asString()); |
82 | | |
83 | | // Tell the cluster about the new host |
84 | | // lambda cannot capture a member by value. |
85 | 0 | std::weak_ptr<OriginalDstClusterHandle> post_parent = parent_; |
86 | 0 | parent_->cluster_->dispatcher_.post([post_parent, host]() mutable { |
87 | | // The main cluster may have disappeared while this post was queued. |
88 | 0 | if (std::shared_ptr<OriginalDstClusterHandle> parent = post_parent.lock()) { |
89 | 0 | parent->cluster_->addHost(host); |
90 | 0 | } |
91 | 0 | }); |
92 | 0 | return host; |
93 | 0 | } else { |
94 | 0 | ENVOY_LOG(debug, "Failed to create host for {}.", dst_addr.asString()); |
95 | 0 | } |
96 | 0 | } |
97 | 0 | } |
98 | | // TODO(ramaraochavali): add a stat and move this log line to debug. |
99 | 0 | ENVOY_LOG(warn, "original_dst_load_balancer: No downstream connection or no original_dst."); |
100 | 0 | return nullptr; |
101 | 0 | } |
102 | | |
103 | | Network::Address::InstanceConstSharedPtr |
104 | 0 | OriginalDstCluster::LoadBalancer::filterStateOverrideHost(LoadBalancerContext* context) { |
105 | 0 | const auto streamInfos = { |
106 | 0 | context->requestStreamInfo(), |
107 | 0 | context->downstreamConnection() ? &context->downstreamConnection()->streamInfo() : nullptr}; |
108 | 0 | for (const auto streamInfo : streamInfos) { |
109 | 0 | if (streamInfo == nullptr) { |
110 | 0 | continue; |
111 | 0 | } |
112 | 0 | const auto* dst_address = streamInfo->filterState().getDataReadOnly<Network::AddressObject>( |
113 | 0 | OriginalDstClusterFilterStateKey); |
114 | 0 | if (dst_address) { |
115 | 0 | return dst_address->address(); |
116 | 0 | } |
117 | 0 | } |
118 | 0 | return nullptr; |
119 | 0 | } |
120 | | |
121 | | Network::Address::InstanceConstSharedPtr |
122 | 0 | OriginalDstCluster::LoadBalancer::requestOverrideHost(LoadBalancerContext* context) { |
123 | 0 | if (!http_header_name_.has_value()) { |
124 | 0 | return nullptr; |
125 | 0 | } |
126 | 0 | const Http::HeaderMap* downstream_headers = context->downstreamHeaders(); |
127 | 0 | if (!downstream_headers) { |
128 | 0 | return nullptr; |
129 | 0 | } |
130 | 0 | Http::HeaderMap::GetResult override_header = downstream_headers->get(*http_header_name_); |
131 | 0 | if (override_header.empty()) { |
132 | 0 | return nullptr; |
133 | 0 | } |
134 | | // This is an implicitly untrusted header, so per the API documentation only the first |
135 | | // value is used. |
136 | 0 | const std::string request_override_host(override_header[0]->value().getStringView()); |
137 | 0 | Network::Address::InstanceConstSharedPtr request_host = |
138 | 0 | Network::Utility::parseInternetAddressAndPortNoThrow(request_override_host, false); |
139 | 0 | if (request_host == nullptr) { |
140 | 0 | ENVOY_LOG(debug, "original_dst_load_balancer: invalid override header value. {}", |
141 | 0 | request_override_host); |
142 | 0 | parent_->cluster_->info()->trafficStats()->original_dst_host_invalid_.inc(); |
143 | 0 | return nullptr; |
144 | 0 | } |
145 | 0 | ENVOY_LOG(debug, "Using request override host {}.", request_override_host); |
146 | 0 | return request_host; |
147 | 0 | } |
148 | | |
149 | | Network::Address::InstanceConstSharedPtr |
150 | 0 | OriginalDstCluster::LoadBalancer::metadataOverrideHost(LoadBalancerContext* context) { |
151 | 0 | if (!metadata_key_.has_value()) { |
152 | 0 | return nullptr; |
153 | 0 | } |
154 | 0 | const auto streamInfos = { |
155 | 0 | context->requestStreamInfo(), |
156 | 0 | context->downstreamConnection() ? &context->downstreamConnection()->streamInfo() : nullptr}; |
157 | 0 | const ProtobufWkt::Value* value = nullptr; |
158 | 0 | for (const auto streamInfo : streamInfos) { |
159 | 0 | if (streamInfo == nullptr) { |
160 | 0 | continue; |
161 | 0 | } |
162 | 0 | const auto& metadata = streamInfo->dynamicMetadata(); |
163 | 0 | value = &Config::Metadata::metadataValue(&metadata, metadata_key_.value()); |
164 | | // Path can refer to a list, in which case we extract the first element. |
165 | 0 | if (value->kind_case() == ProtobufWkt::Value::kListValue) { |
166 | 0 | const auto& values = value->list_value().values(); |
167 | 0 | if (!values.empty()) { |
168 | 0 | value = &(values[0]); |
169 | 0 | } |
170 | 0 | } |
171 | 0 | if (value->kind_case() == ProtobufWkt::Value::kStringValue) { |
172 | 0 | break; |
173 | 0 | } |
174 | 0 | } |
175 | 0 | if (value == nullptr || value->kind_case() != ProtobufWkt::Value::kStringValue) { |
176 | 0 | return nullptr; |
177 | 0 | } |
178 | 0 | const std::string& metadata_override_host = value->string_value(); |
179 | 0 | Network::Address::InstanceConstSharedPtr metadata_host = |
180 | 0 | Network::Utility::parseInternetAddressAndPortNoThrow(metadata_override_host, false); |
181 | 0 | if (metadata_host == nullptr) { |
182 | 0 | ENVOY_LOG(debug, "original_dst_load_balancer: invalid override metadata value. {}", |
183 | 0 | metadata_override_host); |
184 | 0 | parent_->cluster_->info()->trafficStats()->original_dst_host_invalid_.inc(); |
185 | 0 | return nullptr; |
186 | 0 | } |
187 | 0 | ENVOY_LOG(debug, "Using metadata override host {}.", metadata_override_host); |
188 | 0 | return metadata_host; |
189 | 0 | } |
190 | | |
191 | | OriginalDstCluster::OriginalDstCluster(const envoy::config::cluster::v3::Cluster& config, |
192 | | ClusterFactoryContext& context) |
193 | | : ClusterImplBase(config, context), |
194 | | dispatcher_(context.serverFactoryContext().mainThreadDispatcher()), |
195 | | cleanup_interval_ms_( |
196 | | std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, cleanup_interval, 5000))), |
197 | 0 | cleanup_timer_(dispatcher_.createTimer([this]() -> void { cleanup(); })), |
198 | 0 | host_map_(std::make_shared<HostMultiMap>()) { |
199 | 0 | if (const auto& config_opt = info_->lbOriginalDstConfig(); config_opt.has_value()) { |
200 | 0 | if (config_opt->use_http_header()) { |
201 | 0 | http_header_name_ = config_opt->http_header_name().empty() |
202 | 0 | ? Http::Headers::get().EnvoyOriginalDstHost |
203 | 0 | : Http::LowerCaseString(config_opt->http_header_name()); |
204 | 0 | } |
205 | 0 | if (config_opt->has_metadata_key()) { |
206 | 0 | metadata_key_ = Config::MetadataKey(config_opt->metadata_key()); |
207 | 0 | } |
208 | 0 | if (config_opt->has_upstream_port_override()) { |
209 | 0 | port_override_ = config_opt->upstream_port_override().value(); |
210 | 0 | } |
211 | 0 | } |
212 | 0 | cleanup_timer_->enableTimer(cleanup_interval_ms_); |
213 | 0 | } |
214 | | |
215 | 0 | void OriginalDstCluster::addHost(HostSharedPtr& host) { |
216 | 0 | std::string address = host->address()->asString(); |
217 | 0 | HostMultiMapSharedPtr new_host_map = std::make_shared<HostMultiMap>(*getCurrentHostMap()); |
218 | 0 | auto it = new_host_map->find(address); |
219 | 0 | if (it != new_host_map->end()) { |
220 | | // If the entry already exists, that means the worker that posted this host |
221 | | // had a stale host map. Because the host is potentially in that worker's |
222 | | // connection pools, we save the host in the host map hosts_ list and the |
223 | | // cluster priority set. Subsequently, the entire hosts_ list and the |
224 | | // primary host are removed collectively, once no longer in use. |
225 | 0 | it->second->hosts_.push_back(host); |
226 | 0 | } else { |
227 | | // The first worker that creates a host for the address defines the primary |
228 | | // host structure. |
229 | 0 | new_host_map->emplace(address, std::make_shared<HostsForAddress>(host)); |
230 | 0 | } |
231 | 0 | ENVOY_LOG(debug, "addHost() adding {} {}.", *host, address); |
232 | 0 | setHostMap(new_host_map); |
233 | | |
234 | | // Given the current config, only EDS clusters support multiple priorities. |
235 | 0 | ASSERT(priority_set_.hostSetsPerPriority().size() == 1); |
236 | 0 | const auto& first_host_set = priority_set_.getOrCreateHostSet(0); |
237 | 0 | HostVectorSharedPtr all_hosts(new HostVector(first_host_set.hosts())); |
238 | 0 | all_hosts->emplace_back(host); |
239 | 0 | priority_set_.updateHosts(0, |
240 | 0 | HostSetImpl::partitionHosts(all_hosts, HostsPerLocalityImpl::empty()), |
241 | 0 | {}, {std::move(host)}, {}, absl::nullopt, absl::nullopt); |
242 | 0 | } |
243 | | |
244 | 0 | void OriginalDstCluster::cleanup() { |
245 | 0 | HostVectorSharedPtr keeping_hosts(new HostVector); |
246 | 0 | HostVector to_be_removed; |
247 | 0 | absl::flat_hash_set<absl::string_view> removed_addresses; |
248 | 0 | auto host_map = getCurrentHostMap(); |
249 | 0 | if (!host_map->empty()) { |
250 | 0 | ENVOY_LOG(trace, "Cleaning up stale original dst hosts."); |
251 | 0 | for (const auto& [addr, hosts] : *host_map) { |
252 | | // Address is kept in the cluster if either of the two things happen: |
253 | | // 1) a host has been recently selected for the address; 2) none of the |
254 | | // hosts are currently in any of the connection pools. |
255 | | // The set of hosts for a single address are treated as a unit. |
256 | | // |
257 | | // Using the used_ bit is preserved for backwards compatibility and to |
258 | | // add a delay between load balancers choosing a host and grabbing a |
259 | | // handle on the host. This prevents the following interleaving: |
260 | | // |
261 | | // 1) worker 1: pools release host h |
262 | | // 2) worker 1: auto h = lb.chooseHost(&ctx); |
263 | | // 3) main: cleanup() // deletes h because h is not used by the pools |
264 | | // 4) worker 1: auto handle = h.acquireHandle(); |
265 | | // |
266 | | // Because the duration between steps 2) and 4) is O(instructions), step |
267 | | // 3) will not delete h since it takes at least one cleanup_interval for |
268 | | // the host to set used_ bit for h to false. |
269 | 0 | bool keep = false; |
270 | 0 | if (hosts->used_) { |
271 | 0 | keep = true; |
272 | 0 | hosts->used_ = false; // Mark to be removed during the next round. |
273 | 0 | } else if (Runtime::runtimeFeatureEnabled( |
274 | 0 | "envoy.reloadable_features.original_dst_rely_on_idle_timeout")) { |
275 | | // Check that all hosts (first, as well as others that may have been added concurrently) |
276 | | // are not in use by any connection pool. |
277 | 0 | if (hosts->host_->used()) { |
278 | 0 | keep = true; |
279 | 0 | } else { |
280 | 0 | for (const auto& host : hosts->hosts_) { |
281 | 0 | if (host->used()) { |
282 | 0 | keep = true; |
283 | 0 | break; |
284 | 0 | } |
285 | 0 | } |
286 | 0 | } |
287 | 0 | } |
288 | 0 | if (keep) { |
289 | 0 | ENVOY_LOG(trace, "Keeping active address {}.", addr); |
290 | 0 | keeping_hosts->emplace_back(hosts->host_); |
291 | 0 | if (!hosts->hosts_.empty()) { |
292 | 0 | keeping_hosts->insert(keeping_hosts->end(), hosts->hosts_.begin(), hosts->hosts_.end()); |
293 | 0 | } |
294 | 0 | } else { |
295 | 0 | ENVOY_LOG(trace, "Removing stale address {}.", addr); |
296 | 0 | removed_addresses.insert(addr); |
297 | 0 | to_be_removed.emplace_back(hosts->host_); |
298 | 0 | if (!hosts->hosts_.empty()) { |
299 | 0 | to_be_removed.insert(to_be_removed.end(), hosts->hosts_.begin(), hosts->hosts_.end()); |
300 | 0 | } |
301 | 0 | } |
302 | 0 | } |
303 | 0 | } |
304 | 0 | if (!to_be_removed.empty()) { |
305 | 0 | HostMultiMapSharedPtr new_host_map = std::make_shared<HostMultiMap>(*host_map); |
306 | 0 | for (const auto& addr : removed_addresses) { |
307 | 0 | new_host_map->erase(addr); |
308 | 0 | } |
309 | 0 | setHostMap(new_host_map); |
310 | 0 | priority_set_.updateHosts( |
311 | 0 | 0, HostSetImpl::partitionHosts(keeping_hosts, HostsPerLocalityImpl::empty()), {}, {}, |
312 | 0 | to_be_removed, false, absl::nullopt); |
313 | 0 | } |
314 | |
|
315 | 0 | cleanup_timer_->enableTimer(cleanup_interval_ms_); |
316 | 0 | } |
317 | | |
318 | | absl::StatusOr<std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>> |
319 | | OriginalDstClusterFactory::createClusterImpl(const envoy::config::cluster::v3::Cluster& cluster, |
320 | 0 | ClusterFactoryContext& context) { |
321 | 0 | if (cluster.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) { |
322 | 0 | return absl::InvalidArgumentError( |
323 | 0 | fmt::format("cluster: LB policy {} is not valid for Cluster type {}. Only " |
324 | 0 | "'CLUSTER_PROVIDED' is allowed with cluster type 'ORIGINAL_DST'", |
325 | 0 | envoy::config::cluster::v3::Cluster::LbPolicy_Name(cluster.lb_policy()), |
326 | 0 | envoy::config::cluster::v3::Cluster::DiscoveryType_Name(cluster.type()))); |
327 | 0 | } |
328 | | |
329 | 0 | if (cluster.has_load_assignment()) { |
330 | 0 | return absl::InvalidArgumentError( |
331 | 0 | "ORIGINAL_DST clusters must have no load assignment configured"); |
332 | 0 | } |
333 | | |
334 | 0 | if (!cluster.original_dst_lb_config().use_http_header() && |
335 | 0 | !cluster.original_dst_lb_config().http_header_name().empty()) { |
336 | 0 | return absl::InvalidArgumentError(fmt::format( |
337 | 0 | "ORIGINAL_DST cluster: invalid config http_header_name={} and use_http_header is " |
338 | 0 | "false. Set use_http_header to true if http_header_name is desired.", |
339 | 0 | cluster.original_dst_lb_config().http_header_name())); |
340 | 0 | } |
341 | | |
342 | | // TODO(mattklein123): The original DST load balancer type should be deprecated and instead |
343 | | // the cluster should directly supply the load balancer. This will remove |
344 | | // a special case and allow this cluster to be compiled out as an extension. |
345 | 0 | auto new_cluster = std::shared_ptr<OriginalDstCluster>(new OriginalDstCluster(cluster, context)); |
346 | 0 | auto lb = std::make_unique<OriginalDstCluster::ThreadAwareLoadBalancer>( |
347 | 0 | std::make_shared<OriginalDstClusterHandle>(new_cluster)); |
348 | 0 | return std::make_pair(new_cluster, std::move(lb)); |
349 | 0 | } |
350 | | |
351 | | /** |
352 | | * Static registration for the original dst cluster factory. @see RegisterFactory. |
353 | | */ |
354 | | REGISTER_FACTORY(OriginalDstClusterFactory, ClusterFactory); |
355 | | |
356 | | class OriginalDstClusterFilterStateFactory : public Network::BaseAddressObjectFactory { |
357 | | public: |
358 | 8 | std::string name() const override { return std::string(OriginalDstClusterFilterStateKey); } |
359 | | }; |
360 | | |
361 | | REGISTER_FACTORY(OriginalDstClusterFilterStateFactory, StreamInfo::FilterState::ObjectFactory); |
362 | | |
363 | | } // namespace Upstream |
364 | | } // namespace Envoy |