Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/clusters/original_dst/original_dst_cluster.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/clusters/original_dst/original_dst_cluster.h"
2
3
#include <chrono>
4
#include <list>
5
#include <string>
6
#include <vector>
7
8
#include "envoy/config/cluster/v3/cluster.pb.h"
9
#include "envoy/config/core/v3/base.pb.h"
10
#include "envoy/config/core/v3/health_check.pb.h"
11
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
12
#include "envoy/stats/scope.h"
13
14
#include "source/common/http/headers.h"
15
#include "source/common/network/address_impl.h"
16
#include "source/common/network/filter_state_dst_address.h"
17
#include "source/common/network/utility.h"
18
#include "source/common/protobuf/protobuf.h"
19
#include "source/common/protobuf/utility.h"
20
#include "source/common/runtime/runtime_features.h"
21
22
namespace Envoy {
23
namespace Upstream {
24
25
0
OriginalDstClusterHandle::~OriginalDstClusterHandle() {
26
0
  std::shared_ptr<OriginalDstCluster> cluster = std::move(cluster_);
27
0
  cluster_.reset();
28
0
  Event::Dispatcher& dispatcher = cluster->dispatcher_;
29
0
  dispatcher.post([cluster = std::move(cluster)]() mutable { cluster.reset(); });
30
0
}
31
32
0
HostConstSharedPtr OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerContext* context) {
33
0
  if (context) {
34
    // Check if filter state override is present, if yes use it before anything else.
35
0
    Network::Address::InstanceConstSharedPtr dst_host = filterStateOverrideHost(context);
36
37
    // Check if override host metadata is present, if yes use it otherwise check the header.
38
0
    if (dst_host == nullptr) {
39
0
      dst_host = metadataOverrideHost(context);
40
0
    }
41
42
    // Check if override host header is present, if yes use it otherwise check local address.
43
0
    if (dst_host == nullptr) {
44
0
      dst_host = requestOverrideHost(context);
45
0
    }
46
47
0
    if (dst_host == nullptr) {
48
0
      const Network::Connection* connection = context->downstreamConnection();
49
      // The local address of the downstream connection is the original destination address,
50
      // if localAddressRestored() returns 'true'.
51
0
      if (connection && connection->connectionInfoProvider().localAddressRestored()) {
52
0
        dst_host = connection->connectionInfoProvider().localAddress();
53
0
      }
54
0
    }
55
0
    if (dst_host && port_override_.has_value()) {
56
0
      dst_host = Network::Utility::getAddressWithPort(*dst_host.get(), port_override_.value());
57
0
    }
58
59
0
    if (dst_host) {
60
0
      const Network::Address::Instance& dst_addr = *dst_host.get();
61
      // Check if a host with the destination address is already in the host set.
62
0
      auto it = host_map_->find(dst_addr.asString());
63
0
      if (it != host_map_->end()) {
64
0
        HostConstSharedPtr host = it->second->host_;
65
0
        ENVOY_LOG(trace, "Using existing host {} {}.", *host, host->address()->asString());
66
0
        it->second->used_ = true;
67
0
        return host;
68
0
      }
69
      // Add a new host
70
0
      const Network::Address::Ip* dst_ip = dst_addr.ip();
71
0
      if (dst_ip) {
72
0
        Network::Address::InstanceConstSharedPtr host_ip_port(
73
0
            Network::Utility::copyInternetAddressAndPort(*dst_ip));
74
        // Create a host we can use immediately.
75
0
        auto info = parent_->cluster_->info();
76
0
        HostSharedPtr host(std::make_shared<HostImpl>(
77
0
            info, info->name() + dst_addr.asString(), std::move(host_ip_port), nullptr, 1,
78
0
            envoy::config::core::v3::Locality().default_instance(),
79
0
            envoy::config::endpoint::v3::Endpoint::HealthCheckConfig().default_instance(), 0,
80
0
            envoy::config::core::v3::UNKNOWN, parent_->cluster_->time_source_));
81
0
        ENVOY_LOG(debug, "Created host {} {}.", *host, host->address()->asString());
82
83
        // Tell the cluster about the new host
84
        // lambda cannot capture a member by value.
85
0
        std::weak_ptr<OriginalDstClusterHandle> post_parent = parent_;
86
0
        parent_->cluster_->dispatcher_.post([post_parent, host]() mutable {
87
          // The main cluster may have disappeared while this post was queued.
88
0
          if (std::shared_ptr<OriginalDstClusterHandle> parent = post_parent.lock()) {
89
0
            parent->cluster_->addHost(host);
90
0
          }
91
0
        });
92
0
        return host;
93
0
      } else {
94
0
        ENVOY_LOG(debug, "Failed to create host for {}.", dst_addr.asString());
95
0
      }
96
0
    }
97
0
  }
98
  // TODO(ramaraochavali): add a stat and move this log line to debug.
99
0
  ENVOY_LOG(warn, "original_dst_load_balancer: No downstream connection or no original_dst.");
100
0
  return nullptr;
101
0
}
102
103
Network::Address::InstanceConstSharedPtr
104
0
OriginalDstCluster::LoadBalancer::filterStateOverrideHost(LoadBalancerContext* context) {
105
0
  const auto streamInfos = {
106
0
      context->requestStreamInfo(),
107
0
      context->downstreamConnection() ? &context->downstreamConnection()->streamInfo() : nullptr};
108
0
  for (const auto streamInfo : streamInfos) {
109
0
    if (streamInfo == nullptr) {
110
0
      continue;
111
0
    }
112
0
    const auto* dst_address = streamInfo->filterState().getDataReadOnly<Network::AddressObject>(
113
0
        OriginalDstClusterFilterStateKey);
114
0
    if (dst_address) {
115
0
      return dst_address->address();
116
0
    }
117
0
  }
118
0
  return nullptr;
119
0
}
120
121
Network::Address::InstanceConstSharedPtr
122
0
OriginalDstCluster::LoadBalancer::requestOverrideHost(LoadBalancerContext* context) {
123
0
  if (!http_header_name_.has_value()) {
124
0
    return nullptr;
125
0
  }
126
0
  const Http::HeaderMap* downstream_headers = context->downstreamHeaders();
127
0
  if (!downstream_headers) {
128
0
    return nullptr;
129
0
  }
130
0
  Http::HeaderMap::GetResult override_header = downstream_headers->get(*http_header_name_);
131
0
  if (override_header.empty()) {
132
0
    return nullptr;
133
0
  }
134
  // This is an implicitly untrusted header, so per the API documentation only the first
135
  // value is used.
136
0
  const std::string request_override_host(override_header[0]->value().getStringView());
137
0
  Network::Address::InstanceConstSharedPtr request_host =
138
0
      Network::Utility::parseInternetAddressAndPortNoThrow(request_override_host, false);
139
0
  if (request_host == nullptr) {
140
0
    ENVOY_LOG(debug, "original_dst_load_balancer: invalid override header value. {}",
141
0
              request_override_host);
142
0
    parent_->cluster_->info()->trafficStats()->original_dst_host_invalid_.inc();
143
0
    return nullptr;
144
0
  }
145
0
  ENVOY_LOG(debug, "Using request override host {}.", request_override_host);
146
0
  return request_host;
147
0
}
148
149
Network::Address::InstanceConstSharedPtr
150
0
OriginalDstCluster::LoadBalancer::metadataOverrideHost(LoadBalancerContext* context) {
151
0
  if (!metadata_key_.has_value()) {
152
0
    return nullptr;
153
0
  }
154
0
  const auto streamInfos = {
155
0
      context->requestStreamInfo(),
156
0
      context->downstreamConnection() ? &context->downstreamConnection()->streamInfo() : nullptr};
157
0
  const ProtobufWkt::Value* value = nullptr;
158
0
  for (const auto streamInfo : streamInfos) {
159
0
    if (streamInfo == nullptr) {
160
0
      continue;
161
0
    }
162
0
    const auto& metadata = streamInfo->dynamicMetadata();
163
0
    value = &Config::Metadata::metadataValue(&metadata, metadata_key_.value());
164
    // Path can refer to a list, in which case we extract the first element.
165
0
    if (value->kind_case() == ProtobufWkt::Value::kListValue) {
166
0
      const auto& values = value->list_value().values();
167
0
      if (!values.empty()) {
168
0
        value = &(values[0]);
169
0
      }
170
0
    }
171
0
    if (value->kind_case() == ProtobufWkt::Value::kStringValue) {
172
0
      break;
173
0
    }
174
0
  }
175
0
  if (value == nullptr || value->kind_case() != ProtobufWkt::Value::kStringValue) {
176
0
    return nullptr;
177
0
  }
178
0
  const std::string& metadata_override_host = value->string_value();
179
0
  Network::Address::InstanceConstSharedPtr metadata_host =
180
0
      Network::Utility::parseInternetAddressAndPortNoThrow(metadata_override_host, false);
181
0
  if (metadata_host == nullptr) {
182
0
    ENVOY_LOG(debug, "original_dst_load_balancer: invalid override metadata value. {}",
183
0
              metadata_override_host);
184
0
    parent_->cluster_->info()->trafficStats()->original_dst_host_invalid_.inc();
185
0
    return nullptr;
186
0
  }
187
0
  ENVOY_LOG(debug, "Using metadata override host {}.", metadata_override_host);
188
0
  return metadata_host;
189
0
}
190
191
OriginalDstCluster::OriginalDstCluster(const envoy::config::cluster::v3::Cluster& config,
192
                                       ClusterFactoryContext& context)
193
    : ClusterImplBase(config, context),
194
      dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
195
      cleanup_interval_ms_(
196
          std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, cleanup_interval, 5000))),
197
0
      cleanup_timer_(dispatcher_.createTimer([this]() -> void { cleanup(); })),
198
0
      host_map_(std::make_shared<HostMultiMap>()) {
199
0
  if (const auto& config_opt = info_->lbOriginalDstConfig(); config_opt.has_value()) {
200
0
    if (config_opt->use_http_header()) {
201
0
      http_header_name_ = config_opt->http_header_name().empty()
202
0
                              ? Http::Headers::get().EnvoyOriginalDstHost
203
0
                              : Http::LowerCaseString(config_opt->http_header_name());
204
0
    }
205
0
    if (config_opt->has_metadata_key()) {
206
0
      metadata_key_ = Config::MetadataKey(config_opt->metadata_key());
207
0
    }
208
0
    if (config_opt->has_upstream_port_override()) {
209
0
      port_override_ = config_opt->upstream_port_override().value();
210
0
    }
211
0
  }
212
0
  cleanup_timer_->enableTimer(cleanup_interval_ms_);
213
0
}
214
215
0
void OriginalDstCluster::addHost(HostSharedPtr& host) {
216
0
  std::string address = host->address()->asString();
217
0
  HostMultiMapSharedPtr new_host_map = std::make_shared<HostMultiMap>(*getCurrentHostMap());
218
0
  auto it = new_host_map->find(address);
219
0
  if (it != new_host_map->end()) {
220
    // If the entry already exists, that means the worker that posted this host
221
    // had a stale host map. Because the host is potentially in that worker's
222
    // connection pools, we save the host in the host map hosts_ list and the
223
    // cluster priority set. Subsequently, the entire hosts_ list and the
224
    // primary host are removed collectively, once no longer in use.
225
0
    it->second->hosts_.push_back(host);
226
0
  } else {
227
    // The first worker that creates a host for the address defines the primary
228
    // host structure.
229
0
    new_host_map->emplace(address, std::make_shared<HostsForAddress>(host));
230
0
  }
231
0
  ENVOY_LOG(debug, "addHost() adding {} {}.", *host, address);
232
0
  setHostMap(new_host_map);
233
234
  // Given the current config, only EDS clusters support multiple priorities.
235
0
  ASSERT(priority_set_.hostSetsPerPriority().size() == 1);
236
0
  const auto& first_host_set = priority_set_.getOrCreateHostSet(0);
237
0
  HostVectorSharedPtr all_hosts(new HostVector(first_host_set.hosts()));
238
0
  all_hosts->emplace_back(host);
239
0
  priority_set_.updateHosts(0,
240
0
                            HostSetImpl::partitionHosts(all_hosts, HostsPerLocalityImpl::empty()),
241
0
                            {}, {std::move(host)}, {}, absl::nullopt, absl::nullopt);
242
0
}
243
244
0
void OriginalDstCluster::cleanup() {
245
0
  HostVectorSharedPtr keeping_hosts(new HostVector);
246
0
  HostVector to_be_removed;
247
0
  absl::flat_hash_set<absl::string_view> removed_addresses;
248
0
  auto host_map = getCurrentHostMap();
249
0
  if (!host_map->empty()) {
250
0
    ENVOY_LOG(trace, "Cleaning up stale original dst hosts.");
251
0
    for (const auto& [addr, hosts] : *host_map) {
252
      // Address is kept in the cluster if either of the two things happen:
253
      // 1) a host has been recently selected for the address; 2) none of the
254
      // hosts are currently in any of the connection pools.
255
      // The set of hosts for a single address are treated as a unit.
256
      //
257
      // Using the used_ bit is preserved for backwards compatibility and to
258
      // add a delay between load balancers choosing a host and grabbing a
259
      // handle on the host. This prevents the following interleaving:
260
      //
261
      // 1) worker 1: pools release host h
262
      // 2) worker 1: auto h = lb.chooseHost(&ctx);
263
      // 3) main: cleanup() // deletes h because h is not used by the pools
264
      // 4) worker 1: auto handle = h.acquireHandle();
265
      //
266
      // Because the duration between steps 2) and 4) is O(instructions), step
267
      // 3) will not delete h since it takes at least one cleanup_interval for
268
      // the host to set used_ bit for h to false.
269
0
      bool keep = false;
270
0
      if (hosts->used_) {
271
0
        keep = true;
272
0
        hosts->used_ = false; // Mark to be removed during the next round.
273
0
      } else if (Runtime::runtimeFeatureEnabled(
274
0
                     "envoy.reloadable_features.original_dst_rely_on_idle_timeout")) {
275
        // Check that all hosts (first, as well as others that may have been added concurrently)
276
        // are not in use by any connection pool.
277
0
        if (hosts->host_->used()) {
278
0
          keep = true;
279
0
        } else {
280
0
          for (const auto& host : hosts->hosts_) {
281
0
            if (host->used()) {
282
0
              keep = true;
283
0
              break;
284
0
            }
285
0
          }
286
0
        }
287
0
      }
288
0
      if (keep) {
289
0
        ENVOY_LOG(trace, "Keeping active address {}.", addr);
290
0
        keeping_hosts->emplace_back(hosts->host_);
291
0
        if (!hosts->hosts_.empty()) {
292
0
          keeping_hosts->insert(keeping_hosts->end(), hosts->hosts_.begin(), hosts->hosts_.end());
293
0
        }
294
0
      } else {
295
0
        ENVOY_LOG(trace, "Removing stale address {}.", addr);
296
0
        removed_addresses.insert(addr);
297
0
        to_be_removed.emplace_back(hosts->host_);
298
0
        if (!hosts->hosts_.empty()) {
299
0
          to_be_removed.insert(to_be_removed.end(), hosts->hosts_.begin(), hosts->hosts_.end());
300
0
        }
301
0
      }
302
0
    }
303
0
  }
304
0
  if (!to_be_removed.empty()) {
305
0
    HostMultiMapSharedPtr new_host_map = std::make_shared<HostMultiMap>(*host_map);
306
0
    for (const auto& addr : removed_addresses) {
307
0
      new_host_map->erase(addr);
308
0
    }
309
0
    setHostMap(new_host_map);
310
0
    priority_set_.updateHosts(
311
0
        0, HostSetImpl::partitionHosts(keeping_hosts, HostsPerLocalityImpl::empty()), {}, {},
312
0
        to_be_removed, false, absl::nullopt);
313
0
  }
314
315
0
  cleanup_timer_->enableTimer(cleanup_interval_ms_);
316
0
}
317
318
absl::StatusOr<std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>>
319
OriginalDstClusterFactory::createClusterImpl(const envoy::config::cluster::v3::Cluster& cluster,
320
0
                                             ClusterFactoryContext& context) {
321
0
  if (cluster.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
322
0
    return absl::InvalidArgumentError(
323
0
        fmt::format("cluster: LB policy {} is not valid for Cluster type {}. Only "
324
0
                    "'CLUSTER_PROVIDED' is allowed with cluster type 'ORIGINAL_DST'",
325
0
                    envoy::config::cluster::v3::Cluster::LbPolicy_Name(cluster.lb_policy()),
326
0
                    envoy::config::cluster::v3::Cluster::DiscoveryType_Name(cluster.type())));
327
0
  }
328
329
0
  if (cluster.has_load_assignment()) {
330
0
    return absl::InvalidArgumentError(
331
0
        "ORIGINAL_DST clusters must have no load assignment configured");
332
0
  }
333
334
0
  if (!cluster.original_dst_lb_config().use_http_header() &&
335
0
      !cluster.original_dst_lb_config().http_header_name().empty()) {
336
0
    return absl::InvalidArgumentError(fmt::format(
337
0
        "ORIGINAL_DST cluster: invalid config http_header_name={} and use_http_header is "
338
0
        "false. Set use_http_header to true if http_header_name is desired.",
339
0
        cluster.original_dst_lb_config().http_header_name()));
340
0
  }
341
342
  // TODO(mattklein123): The original DST load balancer type should be deprecated and instead
343
  //                     the cluster should directly supply the load balancer. This will remove
344
  //                     a special case and allow this cluster to be compiled out as an extension.
345
0
  auto new_cluster = std::shared_ptr<OriginalDstCluster>(new OriginalDstCluster(cluster, context));
346
0
  auto lb = std::make_unique<OriginalDstCluster::ThreadAwareLoadBalancer>(
347
0
      std::make_shared<OriginalDstClusterHandle>(new_cluster));
348
0
  return std::make_pair(new_cluster, std::move(lb));
349
0
}
350
351
/**
352
 * Static registration for the original dst cluster factory. @see RegisterFactory.
353
 */
354
REGISTER_FACTORY(OriginalDstClusterFactory, ClusterFactory);
355
356
class OriginalDstClusterFilterStateFactory : public Network::BaseAddressObjectFactory {
357
public:
358
8
  std::string name() const override { return std::string(OriginalDstClusterFilterStateKey); }
359
};
360
361
REGISTER_FACTORY(OriginalDstClusterFilterStateFactory, StreamInfo::FilterState::ObjectFactory);
362
363
} // namespace Upstream
364
} // namespace Envoy