1
#include "source/common/upstream/upstream_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <limits>
6
#include <list>
7
#include <memory>
8
#include <string>
9
#include <vector>
10

            
11
#include "envoy/common/optref.h"
12
#include "envoy/config/cluster/v3/circuit_breaker.pb.h"
13
#include "envoy/config/cluster/v3/cluster.pb.h"
14
#include "envoy/config/core/v3/address.pb.h"
15
#include "envoy/config/core/v3/base.pb.h"
16
#include "envoy/config/core/v3/health_check.pb.h"
17
#include "envoy/config/core/v3/protocol.pb.h"
18
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
19
#include "envoy/config/upstream/local_address_selector/v3/default_local_address_selector.pb.h"
20
#include "envoy/event/dispatcher.h"
21
#include "envoy/event/timer.h"
22
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h"
23
#include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.pb.h"
24
#include "envoy/extensions/transport_sockets/raw_buffer/v3/raw_buffer.pb.h"
25
#include "envoy/init/manager.h"
26
#include "envoy/network/dns.h"
27
#include "envoy/network/transport_socket.h"
28
#include "envoy/registry/registry.h"
29
#include "envoy/secret/secret_manager.h"
30
#include "envoy/server/filter_config.h"
31
#include "envoy/server/transport_socket_config.h"
32
#include "envoy/ssl/context_manager.h"
33
#include "envoy/stats/scope.h"
34
#include "envoy/upstream/health_checker.h"
35
#include "envoy/upstream/upstream.h"
36

            
37
#include "source/common/common/dns_utils.h"
38
#include "source/common/common/enum_to_int.h"
39
#include "source/common/common/fmt.h"
40
#include "source/common/common/utility.h"
41
#include "source/common/config/utility.h"
42
#include "source/common/http/http1/codec_stats.h"
43
#include "source/common/http/http2/codec_stats.h"
44
#include "source/common/http/utility.h"
45
#include "source/common/network/address_impl.h"
46
#include "source/common/network/filter_state_proxy_info.h"
47
#include "source/common/network/happy_eyeballs_connection_impl.h"
48
#include "source/common/network/resolver_impl.h"
49
#include "source/common/network/socket_option_factory.h"
50
#include "source/common/network/socket_option_impl.h"
51
#include "source/common/protobuf/protobuf.h"
52
#include "source/common/protobuf/utility.h"
53
#include "source/common/router/config_impl.h"
54
#include "source/common/router/config_utility.h"
55
#include "source/common/runtime/runtime_features.h"
56
#include "source/common/runtime/runtime_impl.h"
57
#include "source/common/stats/deferred_creation.h"
58
#include "source/common/upstream/cluster_factory_impl.h"
59
#include "source/common/upstream/health_checker_impl.h"
60
#include "source/common/upstream/locality_pool.h"
61
#include "source/server/transport_socket_config_impl.h"
62

            
63
#include "absl/container/node_hash_set.h"
64
#include "absl/strings/str_cat.h"
65

            
66
namespace Envoy {
67
namespace Upstream {
68
namespace {
69
const envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig&
70
37
defaultHappyEyeballsConfig() {
71
37
  CONSTRUCT_ON_FIRST_USE(
72
37
      envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig, []() {
73
37
        envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig default_config;
74
37
        default_config.set_first_address_family_version(
75
37
            envoy::config::cluster::v3::UpstreamConnectionOptions::DEFAULT);
76
37
        default_config.mutable_first_address_family_count()->set_value(1);
77
37
        return default_config;
78
37
      }());
79
37
}
80

            
81
20047
std::string addressToString(Network::Address::InstanceConstSharedPtr address) {
82
20047
  if (!address) {
83
1
    return "";
84
1
  }
85
20046
  return address->asString();
86
20047
}
87

            
88
absl::StatusOr<ProtocolOptionsConfigConstSharedPtr>
89
createProtocolOptionsConfig(const std::string& name, const Protobuf::Any& typed_config,
90
13553
                            Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
91
13553
  Server::Configuration::ProtocolOptionsFactory* factory =
92
13553
      Registry::FactoryRegistry<Server::Configuration::NamedNetworkFilterConfigFactory>::getFactory(
93
13553
          name);
94
13553
  if (factory == nullptr) {
95
13462
    factory =
96
13462
        Registry::FactoryRegistry<Server::Configuration::NamedHttpFilterConfigFactory>::getFactory(
97
13462
            name);
98
13462
  }
99
13553
  if (factory == nullptr) {
100
13458
    factory =
101
13458
        Registry::FactoryRegistry<Server::Configuration::ProtocolOptionsFactory>::getFactory(name);
102
13458
  }
103

            
104
13553
  if (factory == nullptr) {
105
4
    return absl::InvalidArgumentError(
106
4
        fmt::format("Didn't find a registered network or http filter or protocol "
107
4
                    "options implementation for name: '{}'",
108
4
                    name));
109
4
  }
110

            
111
13549
  ProtobufTypes::MessagePtr proto_config = factory->createEmptyProtocolOptionsProto();
112

            
113
13549
  if (proto_config == nullptr) {
114
4
    return absl::InvalidArgumentError(
115
4
        fmt::format("filter {} does not support protocol options", name));
116
4
  }
117

            
118
13545
  RETURN_IF_NOT_OK(Envoy::Config::Utility::translateOpaqueConfig(
119
13545
      typed_config, factory_context.messageValidationVisitor(), *proto_config));
120
13545
  return factory->createProtocolOptionsConfig(*proto_config, factory_context);
121
13545
}
122

            
123
absl::StatusOr<absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr>>
124
parseExtensionProtocolOptions(
125
    const envoy::config::cluster::v3::Cluster& config,
126
18194
    Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
127
18194
  absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> options;
128

            
129
18194
  for (const auto& it : config.typed_extension_protocol_options()) {
130
13553
    auto& name = it.first;
131
13553
    auto object_or_error = createProtocolOptionsConfig(name, it.second, factory_context);
132
13553
    RETURN_IF_NOT_OK_REF(object_or_error.status());
133
13545
    if (object_or_error.value() != nullptr) {
134
13539
      options[name] = std::move(object_or_error.value());
135
13539
    }
136
13545
  }
137

            
138
18186
  return options;
139
18194
}
140

            
141
// Updates the EDS health flags for an existing host to match the new host.
142
// @param updated_host the new host to read health flag values from.
143
// @param existing_host the host to update.
144
// @return bool whether the flag update caused the host health to change.
145
492
bool updateEdsHealthFlag(const Host& updated_host, Host& existing_host) {
146
492
  auto updated_eds_health_status = updated_host.edsHealthStatus();
147

            
148
  // Check if the health flag has changed.
149
492
  if (updated_eds_health_status == existing_host.edsHealthStatus()) {
150
460
    return false;
151
460
  }
152

            
153
32
  const auto previous_health = existing_host.coarseHealth();
154
32
  existing_host.setEdsHealthStatus(updated_eds_health_status);
155

            
156
  // Rebuild if changing the flag affected the host health.
157
32
  return previous_health != existing_host.coarseHealth();
158
492
}
159

            
160
// Converts a set of hosts into a HostVector, excluding certain hosts.
161
// @param hosts hosts to convert
162
// @param excluded_hosts hosts to exclude from the resulting vector.
163
HostVector filterHosts(const absl::node_hash_set<HostSharedPtr>& hosts,
164
1590
                       const absl::node_hash_set<HostSharedPtr>& excluded_hosts) {
165
1590
  HostVector net_hosts;
166
1590
  net_hosts.reserve(hosts.size());
167

            
168
1785
  for (const auto& host : hosts) {
169
1164
    if (excluded_hosts.find(host) == excluded_hosts.end()) {
170
1054
      net_hosts.emplace_back(host);
171
1054
    }
172
1164
  }
173

            
174
1590
  return net_hosts;
175
1590
}
176

            
177
Stats::ScopeSharedPtr generateStatsScope(const envoy::config::cluster::v3::Cluster& config,
178
18149
                                         Stats::Store& stats) {
179
18149
  return stats.createScope(fmt::format(
180
18149
      "cluster.{}.", config.alt_stat_name().empty() ? config.name() : config.alt_stat_name()));
181
18149
}
182

            
183
Network::ConnectionSocket::OptionsSharedPtr
184
buildBaseSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config,
185
18175
                       const envoy::config::core::v3::BindConfig& bootstrap_bind_config) {
186
18175
  Network::ConnectionSocket::OptionsSharedPtr base_options =
187
18175
      std::make_shared<Network::ConnectionSocket::Options>();
188

            
189
  // The process-wide `signal()` handling may fail to handle SIGPIPE if overridden
190
  // in the process (i.e., on a mobile client). Some OSes support handling it at the socket layer:
191
18175
  if (ENVOY_SOCKET_SO_NOSIGPIPE.hasValue()) {
192
    Network::Socket::appendOptions(base_options,
193
                                   Network::SocketOptionFactory::buildSocketNoSigpipeOptions());
194
  }
195
  // Cluster IP_FREEBIND settings, when set, will override the cluster manager wide settings.
196
18175
  if ((bootstrap_bind_config.freebind().value() &&
197
18175
       !cluster_config.upstream_bind_config().has_freebind()) ||
198
18175
      cluster_config.upstream_bind_config().freebind().value()) {
199
3
    Network::Socket::appendOptions(base_options,
200
3
                                   Network::SocketOptionFactory::buildIpFreebindOptions());
201
3
  }
202
18175
  if (cluster_config.upstream_connection_options().has_tcp_keepalive()) {
203
9
    Network::Socket::appendOptions(
204
9
        base_options,
205
9
        Network::SocketOptionFactory::buildTcpKeepaliveOptions(Network::parseTcpKeepaliveConfig(
206
9
            cluster_config.upstream_connection_options().tcp_keepalive())));
207
9
  }
208

            
209
18175
  return base_options;
210
18175
}
211

            
212
Network::ConnectionSocket::OptionsSharedPtr
213
buildClusterSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config,
214
18175
                          const envoy::config::core::v3::BindConfig& bootstrap_bind_config) {
215
18175
  Network::ConnectionSocket::OptionsSharedPtr cluster_options =
216
18175
      std::make_shared<Network::ConnectionSocket::Options>();
217
  // Cluster socket_options trump cluster manager wide.
218
18175
  if (bootstrap_bind_config.socket_options().size() +
219
18175
          cluster_config.upstream_bind_config().socket_options().size() >
220
18175
      0) {
221
9
    auto socket_options = !cluster_config.upstream_bind_config().socket_options().empty()
222
9
                              ? cluster_config.upstream_bind_config().socket_options()
223
9
                              : bootstrap_bind_config.socket_options();
224
9
    Network::Socket::appendOptions(
225
9
        cluster_options, Network::SocketOptionFactory::buildLiteralOptions(socket_options));
226
9
  }
227
18175
  return cluster_options;
228
18175
}
229

            
230
absl::StatusOr<std::vector<::Envoy::Upstream::UpstreamLocalAddress>>
231
parseBindConfig(::Envoy::OptRef<const envoy::config::core::v3::BindConfig> bind_config,
232
                const absl::optional<std::string>& cluster_name,
233
                Network::ConnectionSocket::OptionsSharedPtr base_socket_options,
234
18175
                Network::ConnectionSocket::OptionsSharedPtr cluster_socket_options) {
235

            
236
18175
  std::vector<::Envoy::Upstream::UpstreamLocalAddress> upstream_local_addresses;
237
18175
  if (bind_config.has_value()) {
238
85
    UpstreamLocalAddress upstream_local_address;
239
85
    upstream_local_address.address_ = nullptr;
240
85
    if (bind_config->has_source_address()) {
241

            
242
31
      auto address_or_error =
243
31
          ::Envoy::Network::Address::resolveProtoSocketAddress(bind_config->source_address());
244
31
      RETURN_IF_NOT_OK_REF(address_or_error.status());
245
31
      upstream_local_address.address_ = address_or_error.value();
246
31
    }
247
85
    upstream_local_address.socket_options_ = std::make_shared<Network::ConnectionSocket::Options>();
248

            
249
85
    ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_,
250
85
                                            base_socket_options);
251
85
    ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_,
252
85
                                            cluster_socket_options);
253

            
254
85
    upstream_local_addresses.push_back(upstream_local_address);
255

            
256
85
    for (const auto& extra_source_address : bind_config->extra_source_addresses()) {
257
17
      UpstreamLocalAddress extra_upstream_local_address;
258
17
      auto address_or_error =
259
17
          ::Envoy::Network::Address::resolveProtoSocketAddress(extra_source_address.address());
260
17
      RETURN_IF_NOT_OK_REF(address_or_error.status());
261
17
      extra_upstream_local_address.address_ = address_or_error.value();
262

            
263
17
      extra_upstream_local_address.socket_options_ =
264
17
          std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
265
17
      ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_,
266
17
                                              base_socket_options);
267

            
268
17
      if (extra_source_address.has_socket_options()) {
269
2
        ::Envoy::Network::Socket::appendOptions(
270
2
            extra_upstream_local_address.socket_options_,
271
2
            ::Envoy::Network::SocketOptionFactory::buildLiteralOptions(
272
2
                extra_source_address.socket_options().socket_options()));
273
15
      } else {
274
15
        ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_,
275
15
                                                cluster_socket_options);
276
15
      }
277
17
      upstream_local_addresses.push_back(extra_upstream_local_address);
278
17
    }
279

            
280
85
    for (const auto& additional_source_address : bind_config->additional_source_addresses()) {
281
7
      UpstreamLocalAddress additional_upstream_local_address;
282
7
      auto address_or_error =
283
7
          ::Envoy::Network::Address::resolveProtoSocketAddress(additional_source_address);
284
7
      RETURN_IF_NOT_OK_REF(address_or_error.status());
285
7
      additional_upstream_local_address.address_ = address_or_error.value();
286
7
      additional_upstream_local_address.socket_options_ =
287
7
          std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
288
7
      ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_,
289
7
                                              base_socket_options);
290
7
      ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_,
291
7
                                              cluster_socket_options);
292
7
      upstream_local_addresses.push_back(additional_upstream_local_address);
293
7
    }
294
18094
  } else {
295
    // If there is no bind config specified, then return a nullptr for the address.
296
18090
    UpstreamLocalAddress local_address;
297
18090
    local_address.address_ = nullptr;
298
18090
    local_address.socket_options_ = std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
299
18090
    ::Envoy::Network::Socket::appendOptions(local_address.socket_options_, base_socket_options);
300
18090
    Network::Socket::appendOptions(local_address.socket_options_, cluster_socket_options);
301
18090
    upstream_local_addresses.push_back(local_address);
302
18090
  }
303

            
304
  // Verify that we have valid addresses if size is greater than 1.
305
18175
  if (upstream_local_addresses.size() > 1) {
306
42
    for (auto const& upstream_local_address : upstream_local_addresses) {
307
42
      if (upstream_local_address.address_ == nullptr) {
308
        return absl::InvalidArgumentError(fmt::format(
309
            "{}'s upstream binding config has invalid IP addresses.",
310
            !(cluster_name.has_value()) ? "Bootstrap"
311
                                        : fmt::format("Cluster {}", cluster_name.value())));
312
      }
313
42
    }
314
18
  }
315

            
316
18175
  return upstream_local_addresses;
317
18175
}
318

            
319
absl::StatusOr<Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr>
320
createUpstreamLocalAddressSelector(
321
    const envoy::config::cluster::v3::Cluster& cluster_config,
322
18178
    const absl::optional<envoy::config::core::v3::BindConfig>& bootstrap_bind_config) {
323

            
324
  // Use the cluster bind config if specified. This completely overrides the
325
  // bootstrap bind config when present.
326
18178
  OptRef<const envoy::config::core::v3::BindConfig> bind_config;
327
18178
  absl::optional<std::string> cluster_name;
328
18178
  if (cluster_config.has_upstream_bind_config()) {
329
21
    bind_config.emplace(cluster_config.upstream_bind_config());
330
21
    cluster_name.emplace(cluster_config.name());
331
18157
  } else if (bootstrap_bind_config.has_value()) {
332
67
    bind_config.emplace(*bootstrap_bind_config);
333
67
  }
334

            
335
  // Verify that bind config is valid.
336
18178
  if (bind_config.has_value()) {
337
88
    if (bind_config->additional_source_addresses_size() > 0 &&
338
88
        bind_config->extra_source_addresses_size() > 0) {
339
1
      return absl::InvalidArgumentError(fmt::format(
340
1
          "Can't specify both `extra_source_addresses` and `additional_source_addresses` "
341
1
          "in the {}'s upstream binding config",
342
1
          !(cluster_name.has_value()) ? "Bootstrap"
343
1
                                      : fmt::format("Cluster {}", cluster_name.value())));
344
1
    }
345

            
346
87
    if (!bind_config->has_source_address() &&
347
87
        (bind_config->extra_source_addresses_size() > 0 ||
348
56
         bind_config->additional_source_addresses_size() > 0)) {
349
2
      return absl::InvalidArgumentError(fmt::format(
350
2
          "{}'s upstream binding config has extra/additional source addresses but no "
351
2
          "source_address. Extra/additional addresses cannot be specified if "
352
2
          "source_address is not set.",
353
2
          !(cluster_name.has_value()) ? "Bootstrap"
354
2
                                      : fmt::format("Cluster {}", cluster_name.value())));
355
2
    }
356

            
357
#if !defined(__linux__)
358
    auto fail_status = absl::InvalidArgumentError(fmt::format(
359
        "{}'s upstream binding config contains addresses with network namespace filepaths, but the "
360
        "OS is not Linux. Network namespaces can only be used on Linux.",
361
        !(cluster_name.has_value()) ? "Bootstrap"
362
                                    : fmt::format("Cluster {}", cluster_name.value())));
363
    if (bind_config->has_source_address() &&
364
        !bind_config->source_address().network_namespace_filepath().empty()) {
365
      return fail_status;
366
    };
367
    for (const auto& addr : bind_config->extra_source_addresses()) {
368
      if (addr.has_address() && !addr.address().network_namespace_filepath().empty()) {
369
        return fail_status;
370
      }
371
    }
372
    for (const auto& addr : bind_config->additional_source_addresses()) {
373
      if (!addr.network_namespace_filepath().empty()) {
374
        return fail_status;
375
      }
376
    }
377
#endif
378
87
  }
379
18175
  UpstreamLocalAddressSelectorFactory* local_address_selector_factory;
380
18175
  const envoy::config::core::v3::TypedExtensionConfig* const local_address_selector_config =
381
18175
      bind_config.has_value() && bind_config->has_local_address_selector()
382
18175
          ? &bind_config->local_address_selector()
383
18175
          : nullptr;
384
18175
  if (local_address_selector_config) {
385
4
    local_address_selector_factory =
386
4
        Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(
387
4
            *local_address_selector_config, false);
388
18173
  } else {
389
    // Create the default local address selector if one was not specified.
390
18171
    envoy::config::upstream::local_address_selector::v3::DefaultLocalAddressSelector default_config;
391
18171
    envoy::config::core::v3::TypedExtensionConfig typed_extension;
392
18171
    typed_extension.mutable_typed_config()->PackFrom(default_config);
393
18171
    local_address_selector_factory =
394
18171
        Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(typed_extension,
395
18171
                                                                                 false);
396
18171
  }
397
18175
  absl::StatusOr<std::vector<::Envoy::Upstream::UpstreamLocalAddress>> config_or_error =
398
18175
      parseBindConfig(
399
18175
          bind_config, cluster_name,
400
18175
          buildBaseSocketOptions(cluster_config, bootstrap_bind_config.value_or(
401
18175
                                                     envoy::config::core::v3::BindConfig{})),
402
18175
          buildClusterSocketOptions(cluster_config, bootstrap_bind_config.value_or(
403
18175
                                                        envoy::config::core::v3::BindConfig{})));
404
18175
  RETURN_IF_NOT_OK_REF(config_or_error.status());
405
18175
  auto selector_or_error = local_address_selector_factory->createLocalAddressSelector(
406
18175
      config_or_error.value(), cluster_name);
407
18175
  RETURN_IF_NOT_OK_REF(selector_or_error.status());
408
18169
  return selector_or_error.value();
409
18175
}
410

            
411
} // namespace
412

            
413
// Allow disabling ALPN checks for transport sockets. See
414
// https://github.com/envoyproxy/envoy/issues/22876
415
const absl::string_view ClusterImplBase::DoNotValidateAlpnRuntimeKey =
416
    "config.do_not_validate_alpn_support";
417

            
418
// Overriding drop_overload ratio settings from EDS.
419
const absl::string_view ClusterImplBase::DropOverloadRuntimeKey =
420
    "load_balancing_policy.drop_overload_limit";
421

            
422
// TODO(pianiststickman): this implementation takes a lock on the hot path and puts a copy of the
423
// stat name into every host that receives a copy of that metric. This can be improved by putting
424
// a single copy of the stat name into a thread-local key->index map so that the lock can be avoided
425
// and using the index as the key to the stat map instead.
426
103
void LoadMetricStatsImpl::add(const absl::string_view key, double value) {
427
103
  absl::MutexLock lock(mu_);
428
103
  if (map_ == nullptr) {
429
30
    map_ = std::make_unique<StatMap>();
430
30
  }
431
103
  Stat& stat = (*map_)[key];
432
103
  ++stat.num_requests_with_metric;
433
103
  stat.total_metric_value += value;
434
103
}
435

            
436
90
LoadMetricStats::StatMapPtr LoadMetricStatsImpl::latch() {
437
90
  absl::MutexLock lock(mu_);
438
90
  StatMapPtr latched = std::move(map_);
439
90
  map_ = nullptr;
440
90
  return latched;
441
90
}
442

            
443
absl::StatusOr<std::unique_ptr<HostDescriptionImpl>> HostDescriptionImpl::create(
444
    ClusterInfoConstSharedPtr cluster, const std::string& hostname,
445
    Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr endpoint_metadata,
446
    MetadataConstSharedPtr locality_metadata,
447
    std::shared_ptr<const envoy::config::core::v3::Locality> locality,
448
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
449
21975
    uint32_t priority, const AddressVector& address_list) {
450
21975
  absl::Status creation_status = absl::OkStatus();
451
21975
  auto ret = std::unique_ptr<HostDescriptionImpl>(new HostDescriptionImpl(
452
21975
      creation_status, cluster, hostname, dest_address, endpoint_metadata, locality_metadata,
453
21975
      locality, health_check_config, priority, address_list));
454
21975
  RETURN_IF_NOT_OK(creation_status);
455
21973
  return ret;
456
21975
}
457

            
458
HostDescriptionImpl::HostDescriptionImpl(
459
    absl::Status& creation_status, ClusterInfoConstSharedPtr cluster, const std::string& hostname,
460
    Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr endpoint_metadata,
461
    MetadataConstSharedPtr locality_metadata,
462
    std::shared_ptr<const envoy::config::core::v3::Locality> locality,
463
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
464
    uint32_t priority, const AddressVector& address_list)
465
48627
    : HostDescriptionImplBase(cluster, hostname, dest_address, endpoint_metadata, locality_metadata,
466
48627
                              locality, health_check_config, priority, creation_status),
467
48627
      address_(dest_address),
468
48627
      address_list_or_null_(makeAddressListOrNull(dest_address, address_list)),
469
48627
      health_check_address_(resolveHealthCheckAddress(health_check_config, dest_address)) {}
470

            
471
HostDescriptionImplBase::HostDescriptionImplBase(
472
    ClusterInfoConstSharedPtr cluster, const std::string& hostname,
473
    Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr endpoint_metadata,
474
    MetadataConstSharedPtr locality_metadata,
475
    std::shared_ptr<const envoy::config::core::v3::Locality> locality,
476
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
477
    uint32_t priority, absl::Status& creation_status)
478
48806
    : cluster_(cluster), hostname_(hostname),
479
48806
      health_checks_hostname_(health_check_config.hostname()),
480
48806
      canary_(Config::Metadata::metadataValue(endpoint_metadata.get(),
481
48806
                                              Config::MetadataFilters::get().ENVOY_LB,
482
48806
                                              Config::MetadataEnvoyLbKeys::get().CANARY)
483
48806
                  .bool_value()),
484
48806
      endpoint_metadata_(endpoint_metadata),
485
48806
      endpoint_metadata_hash_(endpoint_metadata ? MessageUtil::hash(*endpoint_metadata) : 0),
486
48806
      locality_metadata_(locality_metadata), locality_(std::move(locality)),
487
48806
      locality_zone_stat_name_(locality_->zone(), cluster->statsScope().symbolTable()),
488
48806
      priority_(priority),
489
48806
      socket_factory_(resolveTransportSocketFactory(dest_address, endpoint_metadata_.get())) {
490
48806
  if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) {
491
    // Setting the health check port to non-0 only works for IP-type addresses. Setting the port
492
    // for a pipe address is a misconfiguration.
493
1
    creation_status = absl::InvalidArgumentError(
494
1
        fmt::format("Invalid host configuration: non-zero port for non-IP address"));
495
48805
  } else if (dest_address && dest_address->networkNamespace().has_value()) {
496
1
    creation_status = absl::InvalidArgumentError(
497
1
        "Invalid host configuration: hosts cannot specify network namespaces with their address");
498
1
  }
499
48806
}
500

            
501
HostDescription::SharedConstAddressVector HostDescriptionImplBase::makeAddressListOrNull(
502
48806
    const Network::Address::InstanceConstSharedPtr& address, const AddressVector& address_list) {
503
48806
  if (!address || address_list.empty()) {
504
48578
    return {};
505
48578
  }
506
228
  ASSERT(*address_list.front() == *address);
507
228
  return std::make_shared<AddressVector>(address_list);
508
48806
}
509

            
510
Network::UpstreamTransportSocketFactory& HostDescriptionImplBase::resolveTransportSocketFactory(
511
    const Network::Address::InstanceConstSharedPtr& dest_address,
512
    const envoy::config::core::v3::Metadata* endpoint_metadata,
513
48869
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const {
514
48869
  auto match = cluster_->transportSocketMatcher().resolve(
515
48869
      endpoint_metadata, locality_metadata_.get(), transport_socket_options);
516
48869
  match.stats_.total_match_count_.inc();
517
48869
  ENVOY_LOG(debug, "transport socket match, socket {} selected for host with address {}",
518
48869
            match.name_, dest_address ? dest_address->asString() : "empty");
519

            
520
48869
  return match.factory_;
521
48869
}
522

            
523
Host::CreateConnectionData HostImplBase::createConnection(
524
    Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
525
30116
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const {
526
30116
  const bool needs_per_connection_resolution =
527
30116
      cluster().transportSocketMatcher().usesFilterState() && transport_socket_options &&
528
30116
      !transport_socket_options->downstreamSharedFilterStateObjects().empty();
529

            
530
30116
  Network::UpstreamTransportSocketFactory& factory =
531
30116
      needs_per_connection_resolution
532
30116
          ? resolveTransportSocketFactory(address(), metadata().get(), transport_socket_options)
533
30116
          : transportSocketFactory();
534

            
535
30116
  return createConnection(dispatcher, cluster(), address(), addressListOrNull(), factory, options,
536
30116
                          transport_socket_options, shared_from_this());
537
30116
}
538

            
539
26866
void HostImplBase::setEdsHealthFlag(envoy::config::core::v3::HealthStatus health_status) {
540
  // Clear all old EDS health flags first.
541
26866
  HostImplBase::healthFlagClear(Host::HealthFlag::FAILED_EDS_HEALTH);
542
26866
  HostImplBase::healthFlagClear(Host::HealthFlag::DEGRADED_EDS_HEALTH);
543
26866
  HostImplBase::healthFlagClear(Host::HealthFlag::EDS_STATUS_DRAINING);
544

            
545
  // Set the appropriate EDS health flag.
546
26866
  switch (health_status) {
547
78
  case envoy::config::core::v3::UNHEALTHY:
548
78
    FALLTHRU;
549
90
  case envoy::config::core::v3::TIMEOUT:
550
90
    HostImplBase::healthFlagSet(Host::HealthFlag::FAILED_EDS_HEALTH);
551
90
    break;
552
9
  case envoy::config::core::v3::DRAINING:
553
9
    HostImplBase::healthFlagSet(Host::HealthFlag::EDS_STATUS_DRAINING);
554
9
    break;
555
42
  case envoy::config::core::v3::DEGRADED:
556
42
    HostImplBase::healthFlagSet(Host::HealthFlag::DEGRADED_EDS_HEALTH);
557
42
    break;
558
26725
  default:
559
26725
    break;
560
    // No health flags should be set.
561
26866
  }
562
26866
}
563

            
564
Host::CreateConnectionData HostImplBase::createHealthCheckConnection(
565
    Event::Dispatcher& dispatcher,
566
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
567
391
    const envoy::config::core::v3::Metadata* metadata) const {
568
391
  Network::UpstreamTransportSocketFactory& factory =
569
391
      (metadata != nullptr)
570
391
          ? resolveTransportSocketFactory(healthCheckAddress(), metadata, transport_socket_options)
571
391
          : transportSocketFactory();
572
391
  return createConnection(dispatcher, cluster(), healthCheckAddress(), {}, factory, nullptr,
573
391
                          transport_socket_options, shared_from_this());
574
391
}
575

            
576
absl::optional<Network::Address::InstanceConstSharedPtr> HostImplBase::maybeGetProxyRedirectAddress(
577
    const Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
578
    HostDescriptionConstSharedPtr host,
579
30615
    const Network::UpstreamTransportSocketFactory& socket_factory) {
580
30615
  if (transport_socket_options && transport_socket_options->http11ProxyInfo().has_value()) {
581
12
    return transport_socket_options->http11ProxyInfo()->proxy_address;
582
12
  }
583

            
584
  // See if host metadata contains a proxy address and only check locality metadata if host
585
  // metadata did not have the relevant key.
586
61206
  for (const auto& metadata : {host->metadata(), host->localityMetadata()}) {
587
61206
    if (metadata == nullptr) {
588
60837
      continue;
589
60837
    }
590

            
591
369
    auto addr_it = metadata->typed_filter_metadata().find(
592
369
        Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR);
593
369
    if (addr_it == metadata->typed_filter_metadata().end()) {
594
369
      continue;
595
369
    }
596

            
597
    // Parse an address from the metadata.
598
    envoy::config::core::v3::Address proxy_addr;
599
    auto status = MessageUtil::unpackTo(addr_it->second, proxy_addr);
600
    if (!status.ok()) {
601
      ENVOY_LOG_EVERY_POW_2(
602
          error, "failed to parse proto from endpoint/locality metadata field {}, host={}",
603
          Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR,
604
          host->hostname());
605
      return absl::nullopt;
606
    }
607

            
608
    // Resolve the parsed address proto.
609
    auto resolve_status = Network::Address::resolveProtoAddress(proxy_addr);
610
    if (!resolve_status.ok()) {
611
      ENVOY_LOG_EVERY_POW_2(
612
          error, "failed to resolve address from endpoint/locality metadata field {}, host={}",
613
          Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR,
614
          host->hostname());
615
      return absl::nullopt;
616
    }
617

            
618
    // We successfully resolved, so return the instance ptr.
619
    return resolve_status.value();
620
  }
621

            
622
  // Proxy address was not found in the metadata. If a default proxy address is set, return that.
623
30603
  if (socket_factory.defaultHttp11ProxyInfo().has_value()) {
624
1
    return socket_factory.defaultHttp11ProxyInfo()->proxy_address;
625
1
  }
626

            
627
30602
  return absl::nullopt;
628
30603
}
629

            
630
Host::CreateConnectionData HostImplBase::createConnection(
631
    Event::Dispatcher& dispatcher, const ClusterInfo& cluster,
632
    const Network::Address::InstanceConstSharedPtr& address,
633
    const SharedConstAddressVector& address_list_or_null,
634
    Network::UpstreamTransportSocketFactory& socket_factory,
635
    const Network::ConnectionSocket::OptionsSharedPtr& options,
636
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
637
30615
    HostDescriptionConstSharedPtr host) {
638
30615
  auto source_address_selector = cluster.getUpstreamLocalAddressSelector();
639

            
640
30615
  absl::optional<Network::Address::InstanceConstSharedPtr> proxy_address =
641
30615
      maybeGetProxyRedirectAddress(transport_socket_options, host, socket_factory);
642

            
643
30615
  Network::ClientConnectionPtr connection;
644
  // If the transport socket options or endpoint/locality metadata indicate the connection should
645
  // be redirected to a proxy, create the TCP connection to the proxy's address not the host's
646
  // address.
647
30615
  if (proxy_address.has_value()) {
648
13
    auto upstream_local_address = source_address_selector->getUpstreamLocalAddress(
649
13
        address, options, makeOptRefFromPtr(transport_socket_options.get()));
650
13
    ENVOY_LOG(debug, "Connecting to configured HTTP/1.1 proxy at {}",
651
13
              proxy_address.value()->asString());
652
13
    connection = dispatcher.createClientConnection(
653
13
        proxy_address.value(), upstream_local_address.address_,
654
13
        socket_factory.createTransportSocket(transport_socket_options, host),
655
13
        upstream_local_address.socket_options_, transport_socket_options);
656
30609
  } else if (address_list_or_null != nullptr && address_list_or_null->size() > 1) {
657
38
    ENVOY_LOG(debug, "Upstream using happy eyeballs config.");
658
38
    const envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig&
659
38
        happy_eyeballs_config =
660
38
            cluster.happyEyeballsConfig().has_value() ? *cluster.happyEyeballsConfig()
661
38
                                                      : defaultHappyEyeballsConfig();
662
38
    connection = std::make_unique<Network::HappyEyeballsConnectionImpl>(
663
38
        dispatcher, *address_list_or_null, source_address_selector, socket_factory,
664
38
        transport_socket_options, host, options, happy_eyeballs_config);
665
30592
  } else {
666
30564
    auto upstream_local_address = source_address_selector->getUpstreamLocalAddress(
667
30564
        address, options, makeOptRefFromPtr(transport_socket_options.get()));
668
30564
    connection = dispatcher.createClientConnection(
669
30564
        address, upstream_local_address.address_,
670
30564
        socket_factory.createTransportSocket(transport_socket_options, host),
671
30564
        upstream_local_address.socket_options_, transport_socket_options);
672
30564
  }
673

            
674
30615
  connection->connectionInfoSetter().enableSettingInterfaceName(
675
30615
      cluster.setLocalInterfaceNameOnUpstreamConnections());
676
30615
  connection->setBufferLimits(cluster.perConnectionBufferLimitBytes());
677
30615
  const auto timeout = cluster.perConnectionBufferHighWatermarkTimeout();
678
30615
  if (timeout.count() > 0) {
679
1
    connection->setBufferHighWatermarkTimeout(timeout);
680
1
  }
681
30615
  if (auto upstream_info = connection->streamInfo().upstreamInfo(); upstream_info) {
682
30606
    upstream_info->setUpstreamHost(host);
683
30606
  }
684
30615
  cluster.createNetworkFilterChain(*connection);
685
30615
  return {std::move(connection), std::move(host)};
686
30615
}
687

            
688
27033
void HostImplBase::weight(uint32_t new_weight) { weight_ = std::max(1U, new_weight); }
689

            
690
absl::StatusOr<std::unique_ptr<HostImpl>> HostImpl::create(
691
    ClusterInfoConstSharedPtr cluster, const std::string& hostname,
692
    Network::Address::InstanceConstSharedPtr address, MetadataConstSharedPtr endpoint_metadata,
693
    MetadataConstSharedPtr locality_metadata, uint32_t initial_weight,
694
    std::shared_ptr<const envoy::config::core::v3::Locality> locality,
695
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
696
    uint32_t priority, const envoy::config::core::v3::HealthStatus health_status,
697
26540
    const AddressVector& address_list) {
698
26540
  absl::Status creation_status = absl::OkStatus();
699
26540
  auto ret = std::unique_ptr<HostImpl>(new HostImpl(
700
26540
      creation_status, cluster, hostname, address, endpoint_metadata, locality_metadata,
701
26540
      initial_weight, locality, health_check_config, priority, health_status, address_list));
702
26540
  RETURN_IF_NOT_OK(creation_status);
703
26540
  return ret;
704
26540
}
705

            
706
std::vector<HostsPerLocalityConstSharedPtr> HostsPerLocalityImpl::filter(
707
20701
    const std::vector<std::function<bool(const Host&)>>& predicates) const {
708
  // We keep two lists: one for being able to mutate the clone and one for returning to the
709
  // caller. Creating them both at the start avoids iterating over the mutable values at the end
710
  // to convert them to a const pointer.
711
20701
  std::vector<std::shared_ptr<HostsPerLocalityImpl>> mutable_clones;
712
20701
  std::vector<HostsPerLocalityConstSharedPtr> filtered_clones;
713

            
714
20701
  mutable_clones.reserve(predicates.size());
715
20701
  filtered_clones.reserve(predicates.size());
716
77522
  for (size_t i = 0; i < predicates.size(); ++i) {
717
56821
    mutable_clones.emplace_back(std::make_shared<HostsPerLocalityImpl>());
718
56821
    filtered_clones.emplace_back(mutable_clones.back());
719
56821
    mutable_clones.back()->local_ = local_;
720
56821
  }
721

            
722
20732
  for (const auto& hosts_locality : hosts_per_locality_) {
723
19294
    std::vector<HostVector> current_locality_hosts;
724
19294
    current_locality_hosts.resize(predicates.size());
725

            
726
    // Since # of hosts >> # of predicates, we iterate over the hosts in the outer loop.
727
21537
    for (const auto& host : hosts_locality) {
728
80506
      for (size_t i = 0; i < predicates.size(); ++i) {
729
59063
        if (predicates[i](*host)) {
730
19839
          current_locality_hosts[i].emplace_back(host);
731
19839
        }
732
59063
      }
733
21443
    }
734

            
735
74228
    for (size_t i = 0; i < predicates.size(); ++i) {
736
54934
      mutable_clones[i]->hosts_per_locality_.push_back(std::move(current_locality_hosts[i]));
737
54934
    }
738
19294
  }
739

            
740
20701
  return filtered_clones;
741
20701
}
742

            
743
void HostSetImpl::updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params,
744
                              LocalityWeightsConstSharedPtr locality_weights,
745
                              const HostVector& hosts_added, const HostVector& hosts_removed,
746
                              absl::optional<bool> weighted_priority_health,
747
53095
                              absl::optional<uint32_t> overprovisioning_factor) {
748
53096
  if (weighted_priority_health.has_value()) {
749
52381
    weighted_priority_health_ = weighted_priority_health.value();
750
52381
  }
751
53096
  if (overprovisioning_factor.has_value()) {
752
52321
    ASSERT(overprovisioning_factor.value() > 0);
753
52321
    overprovisioning_factor_ = overprovisioning_factor.value();
754
52321
  }
755
53095
  hosts_ = std::move(update_hosts_params.hosts);
756
53095
  healthy_hosts_ = std::move(update_hosts_params.healthy_hosts);
757
53095
  degraded_hosts_ = std::move(update_hosts_params.degraded_hosts);
758
53095
  excluded_hosts_ = std::move(update_hosts_params.excluded_hosts);
759
53095
  hosts_per_locality_ = std::move(update_hosts_params.hosts_per_locality);
760
53095
  healthy_hosts_per_locality_ = std::move(update_hosts_params.healthy_hosts_per_locality);
761
53095
  degraded_hosts_per_locality_ = std::move(update_hosts_params.degraded_hosts_per_locality);
762
53095
  excluded_hosts_per_locality_ = std::move(update_hosts_params.excluded_hosts_per_locality);
763
53095
  locality_weights_ = std::move(locality_weights);
764

            
765
53095
  runUpdateCallbacks(hosts_added, hosts_removed);
766
53095
}
767

            
768
PrioritySet::UpdateHostsParams
769
HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts,
770
                               HostsPerLocalityConstSharedPtr hosts_per_locality,
771
                               HealthyHostVectorConstSharedPtr healthy_hosts,
772
                               HostsPerLocalityConstSharedPtr healthy_hosts_per_locality,
773
                               DegradedHostVectorConstSharedPtr degraded_hosts,
774
                               HostsPerLocalityConstSharedPtr degraded_hosts_per_locality,
775
                               ExcludedHostVectorConstSharedPtr excluded_hosts,
776
36537
                               HostsPerLocalityConstSharedPtr excluded_hosts_per_locality) {
777
36537
  return PrioritySet::UpdateHostsParams{std::move(hosts),
778
36537
                                        std::move(healthy_hosts),
779
36537
                                        std::move(degraded_hosts),
780
36537
                                        std::move(excluded_hosts),
781
36537
                                        std::move(hosts_per_locality),
782
36537
                                        std::move(healthy_hosts_per_locality),
783
36537
                                        std::move(degraded_hosts_per_locality),
784
36537
                                        std::move(excluded_hosts_per_locality)};
785
36537
}
786

            
787
17559
PrioritySet::UpdateHostsParams HostSetImpl::updateHostsParams(const HostSet& host_set) {
788
17559
  return updateHostsParams(host_set.hostsPtr(), host_set.hostsPerLocalityPtr(),
789
17559
                           host_set.healthyHostsPtr(), host_set.healthyHostsPerLocalityPtr(),
790
17559
                           host_set.degradedHostsPtr(), host_set.degradedHostsPerLocalityPtr(),
791
17559
                           host_set.excludedHostsPtr(), host_set.excludedHostsPerLocalityPtr());
792
17559
}
793
PrioritySet::UpdateHostsParams
794
HostSetImpl::partitionHosts(HostVectorConstSharedPtr hosts,
795
18060
                            HostsPerLocalityConstSharedPtr hosts_per_locality) {
796
18060
  auto partitioned_hosts = ClusterImplBase::partitionHostList(*hosts);
797
18060
  auto healthy_degraded_excluded_hosts_per_locality =
798
18060
      ClusterImplBase::partitionHostsPerLocality(*hosts_per_locality);
799

            
800
18060
  return updateHostsParams(std::move(hosts), std::move(hosts_per_locality),
801
18060
                           std::move(std::get<0>(partitioned_hosts)),
802
18060
                           std::move(std::get<0>(healthy_degraded_excluded_hosts_per_locality)),
803
18060
                           std::move(std::get<1>(partitioned_hosts)),
804
18060
                           std::move(std::get<1>(healthy_degraded_excluded_hosts_per_locality)),
805
18060
                           std::move(std::get<2>(partitioned_hosts)),
806
18060
                           std::move(std::get<2>(healthy_degraded_excluded_hosts_per_locality)));
807
18060
}
808

            
809
const HostSet&
810
PrioritySetImpl::getOrCreateHostSet(uint32_t priority,
811
                                    absl::optional<bool> weighted_priority_health,
812
106719
                                    absl::optional<uint32_t> overprovisioning_factor) {
813
106719
  if (host_sets_.size() < priority + 1) {
814
106331
    for (size_t i = host_sets_.size(); i <= priority; ++i) {
815
53229
      HostSetImplPtr host_set = createHostSet(i, weighted_priority_health, overprovisioning_factor);
816
53229
      host_sets_priority_update_cbs_.push_back(
817
53229
          host_set->addPriorityUpdateCb([this](uint32_t priority, const HostVector& hosts_added,
818
54291
                                               const HostVector& hosts_removed) {
819
53001
            return runReferenceUpdateCallbacks(priority, hosts_added, hosts_removed);
820
53001
          }));
821
53229
      host_sets_.push_back(std::move(host_set));
822
53229
    }
823
53102
  }
824
106719
  return *host_sets_[priority];
825
106719
}
826

            
827
void PrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
828
                                  LocalityWeightsConstSharedPtr locality_weights,
829
                                  const HostVector& hosts_added, const HostVector& hosts_removed,
830
                                  absl::optional<bool> weighted_priority_health,
831
                                  absl::optional<uint32_t> overprovisioning_factor,
832
52209
                                  HostMapConstSharedPtr cross_priority_host_map) {
833
  // Update cross priority host map first. In this way, when the update callbacks of the priority
834
  // set are executed, the latest host map can always be obtained.
835
52209
  if (cross_priority_host_map != nullptr) {
836
33928
    const_cross_priority_host_map_ = std::move(cross_priority_host_map);
837
33928
  }
838

            
839
  // Ensure that we have a HostSet for the given priority.
840
52209
  getOrCreateHostSet(priority, weighted_priority_health, overprovisioning_factor);
841
52209
  static_cast<HostSetImpl*>(host_sets_[priority].get())
842
52209
      ->updateHosts(std::move(update_hosts_params), std::move(locality_weights), hosts_added,
843
52209
                    hosts_removed, weighted_priority_health, overprovisioning_factor);
844

            
845
52209
  if (!batch_update_) {
846
51392
    runUpdateCallbacks(hosts_added, hosts_removed);
847
51392
  }
848
52209
}
849

            
850
803
void PrioritySetImpl::batchHostUpdate(BatchUpdateCb& callback) {
851
803
  BatchUpdateScope scope(*this);
852

            
853
  // We wrap the update call with a lambda that tracks all the hosts that have been added/removed.
854
803
  callback.batchUpdate(scope);
855

            
856
  // Now that all the updates have been complete, we can compute the diff.
857
803
  HostVector net_hosts_added = filterHosts(scope.all_hosts_added_, scope.all_hosts_removed_);
858
803
  HostVector net_hosts_removed = filterHosts(scope.all_hosts_removed_, scope.all_hosts_added_);
859

            
860
803
  runUpdateCallbacks(net_hosts_added, net_hosts_removed);
861
803
}
862

            
863
void PrioritySetImpl::BatchUpdateScope::updateHosts(
864
    uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params,
865
    LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
866
    const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health,
867
798
    absl::optional<uint32_t> overprovisioning_factor) {
868
  // We assume that each call updates a different priority.
869
798
  ASSERT(priorities_.find(priority) == priorities_.end());
870
798
  priorities_.insert(priority);
871

            
872
1024
  for (const auto& host : hosts_added) {
873
984
    all_hosts_added_.insert(host);
874
984
  }
875

            
876
798
  for (const auto& host : hosts_removed) {
877
180
    all_hosts_removed_.insert(host);
878
180
  }
879

            
880
798
  parent_.updateHosts(priority, std::move(update_hosts_params), locality_weights, hosts_added,
881
798
                      hosts_removed, weighted_priority_health, overprovisioning_factor);
882
798
}
883

            
884
void MainPrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
885
                                      LocalityWeightsConstSharedPtr locality_weights,
886
                                      const HostVector& hosts_added,
887
                                      const HostVector& hosts_removed,
888
                                      absl::optional<bool> weighted_priority_health,
889
                                      absl::optional<uint32_t> overprovisioning_factor,
890
17900
                                      HostMapConstSharedPtr cross_priority_host_map) {
891
17900
  ASSERT(cross_priority_host_map == nullptr,
892
17900
         "External cross-priority host map is meaningless to MainPrioritySetImpl");
893
17900
  updateCrossPriorityHostMap(priority, hosts_added, hosts_removed);
894

            
895
17900
  PrioritySetImpl::updateHosts(priority, std::move(update_hosts_params), locality_weights,
896
17900
                               hosts_added, hosts_removed, weighted_priority_health,
897
17900
                               overprovisioning_factor);
898
17900
}
899

            
900
18603
HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const {
901
  // Check if the host set in the main thread PrioritySet has been updated.
902
18603
  if (mutable_cross_priority_host_map_ != nullptr) {
903
17305
    const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_);
904
17305
    ASSERT(mutable_cross_priority_host_map_ == nullptr);
905
17305
  }
906
18603
  return const_cross_priority_host_map_;
907
18603
}
908

            
909
void MainPrioritySetImpl::updateCrossPriorityHostMap(uint32_t priority,
910
                                                     const HostVector& hosts_added,
911
17900
                                                     const HostVector& hosts_removed) {
912
17900
  if (hosts_added.empty() && hosts_removed.empty()) {
913
    // No new hosts have been added and no old hosts have been removed.
914
267
    return;
915
267
  }
916

            
917
  // Since read_only_all_host_map_ may be shared by multiple threads, when the host set changes,
918
  // we cannot directly modify read_only_all_host_map_.
919
17633
  if (mutable_cross_priority_host_map_ == nullptr) {
920
    // Copy old read only host map to mutable host map.
921
17508
    mutable_cross_priority_host_map_ = std::make_shared<HostMap>(*const_cross_priority_host_map_);
922
17508
  }
923

            
924
17633
  for (const auto& host : hosts_removed) {
925
269
    const auto host_address = addressToString(host->address());
926
269
    const auto existing_host = mutable_cross_priority_host_map_->find(host_address);
927
269
    if (existing_host != mutable_cross_priority_host_map_->end()) {
928
      // Only delete from the current priority to protect from situations where
929
      // the add operation was already executed and has already moved the metadata of the host
930
      // from a higher priority value to a lower priority value.
931
263
      if (existing_host->second->priority() == priority) {
932
242
        mutable_cross_priority_host_map_->erase(host_address);
933
242
      }
934
263
    }
935
269
  }
936

            
937
18290
  for (const auto& host : hosts_added) {
938
18273
    mutable_cross_priority_host_map_->insert({addressToString(host->address()), host});
939
18273
  }
940
17633
}
941

            
942
DeferredCreationCompatibleClusterTrafficStats
943
ClusterInfoImpl::generateStats(Stats::ScopeSharedPtr scope,
944
246445
                               const ClusterTrafficStatNames& stat_names, bool defer_creation) {
945
246445
  return Stats::createDeferredCompatibleStats<ClusterTrafficStats>(scope, stat_names,
946
246445
                                                                   defer_creation);
947
246445
}
948

            
949
ClusterRequestResponseSizeStats ClusterInfoImpl::generateRequestResponseSizeStats(
950
228288
    Stats::Scope& scope, const ClusterRequestResponseSizeStatNames& stat_names) {
951
228288
  return {stat_names, scope};
952
228288
}
953

            
954
ClusterLoadReportStats
955
ClusterInfoImpl::generateLoadReportStats(Stats::Scope& scope,
956
246440
                                         const ClusterLoadReportStatNames& stat_names) {
957
246440
  return {stat_names, scope};
958
246440
}
959

            
960
ClusterTimeoutBudgetStats
961
ClusterInfoImpl::generateTimeoutBudgetStats(Stats::Scope& scope,
962
228266
                                            const ClusterTimeoutBudgetStatNames& stat_names) {
963
228266
  return {stat_names, scope};
964
228266
}
965

            
966
absl::StatusOr<std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>>
967
createOptions(const envoy::config::cluster::v3::Cluster& config,
968
              std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>&& options,
969
18182
              Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
970
18182
  if (options) {
971
13447
    return std::move(options);
972
13447
  }
973

            
974
4735
  if (config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_CONFIGURED_PROTOCOL) {
975
    // Make sure multiple protocol configurations are not present
976
4728
    if (config.has_http_protocol_options() && config.has_http2_protocol_options()) {
977
      return absl::InvalidArgumentError(
978
          fmt::format("cluster: Both HTTP1 and HTTP2 options may only be "
979
                      "configured with non-default 'protocol_selection' values"));
980
    }
981
4728
  }
982

            
983
4735
  auto options_or_error =
984
4735
      ClusterInfoImpl::HttpProtocolOptionsConfigImpl::createProtocolOptionsConfig(
985
4735
          config.http_protocol_options(), config.http2_protocol_options(),
986
4735
          config.common_http_protocol_options(),
987
4735
          (config.has_upstream_http_protocol_options()
988
4735
               ? absl::make_optional<envoy::config::core::v3::UpstreamHttpProtocolOptions>(
989
16
                     config.upstream_http_protocol_options())
990
4735
               : absl::nullopt),
991
4735
          config.protocol_selection() ==
992
4735
              envoy::config::cluster::v3::Cluster::USE_DOWNSTREAM_PROTOCOL,
993
4735
          config.has_http2_protocol_options(), factory_context.serverFactoryContext(),
994
4735
          factory_context.messageValidationVisitor());
995
4735
  RETURN_IF_NOT_OK_REF(options_or_error.status());
996
4733
  return options_or_error.value();
997
4735
}
998

            
999
absl::StatusOr<LegacyLbPolicyConfigHelper::Result>
LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProtoWithoutSubset(
13082
    Server::Configuration::ServerFactoryContext& factory_context, const ClusterProto& cluster) {
13082
  TypedLoadBalancerFactory* lb_factory = nullptr;
13082
  switch (cluster.lb_policy()) {
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
12178
  case ClusterProto::ROUND_ROBIN:
12178
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
12178
        "envoy.load_balancing_policies.round_robin");
12178
    break;
20
  case ClusterProto::LEAST_REQUEST:
20
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
20
        "envoy.load_balancing_policies.least_request");
20
    break;
75
  case ClusterProto::RANDOM:
75
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
75
        "envoy.load_balancing_policies.random");
75
    break;
35
  case ClusterProto::RING_HASH:
35
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
35
        "envoy.load_balancing_policies.ring_hash");
35
    break;
460
  case ClusterProto::MAGLEV:
460
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
460
        "envoy.load_balancing_policies.maglev");
460
    break;
314
  case ClusterProto::CLUSTER_PROVIDED:
314
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
314
        "envoy.load_balancing_policies.cluster_provided");
314
    break;
  case ClusterProto::LOAD_BALANCING_POLICY_CONFIG:
    // 'LOAD_BALANCING_POLICY_CONFIG' should be handled by the 'configureLbPolicies'
    // function and should not reach here.
    PANIC("getTypedLbConfigFromLegacyProtoWithoutSubset: should not reach here");
    break;
13082
  }
13082
  if (lb_factory == nullptr) {
    return absl::InvalidArgumentError(
        fmt::format("No load balancer factory found for LB type: {}",
                    ClusterProto::LbPolicy_Name(cluster.lb_policy())));
  }
13082
  auto lb_config_or_error = lb_factory->loadLegacy(factory_context, cluster);
13082
  RETURN_IF_NOT_OK_REF(lb_config_or_error.status());
13082
  return Result{lb_factory, std::move(lb_config_or_error.value())};
13082
}
absl::StatusOr<LegacyLbPolicyConfigHelper::Result>
LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto(
13084
    Server::Configuration::ServerFactoryContext& factory_context, const ClusterProto& cluster) {
13084
  if (!cluster.has_lb_subset_config() ||
      // Note it is possible to have a lb_subset_config without actually having any
      // subset selectors. In this case the subset load balancer should not be used.
13084
      cluster.lb_subset_config().subset_selectors().empty()) {
13031
    return getTypedLbConfigFromLegacyProtoWithoutSubset(factory_context, cluster);
13031
  }
53
  auto* lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
53
      "envoy.load_balancing_policies.subset");
53
  if (lb_factory == nullptr) {
    return absl::InvalidArgumentError("No subset load balancer factory found");
  }
53
  auto subset_lb_config_or_error = lb_factory->loadLegacy(factory_context, cluster);
53
  RETURN_IF_NOT_OK_REF(subset_lb_config_or_error.status());
51
  return Result{lb_factory, std::move(subset_lb_config_or_error.value())};
53
}
using ProtocolOptionsHashMap =
    absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr>;
absl::StatusOr<std::unique_ptr<ClusterInfoImpl>>
ClusterInfoImpl::create(Init::Manager& info,
                        Server::Configuration::ServerFactoryContext& server_context,
                        const envoy::config::cluster::v3::Cluster& config,
                        const absl::optional<envoy::config::core::v3::BindConfig>& bind_config,
                        Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher,
                        Stats::ScopeSharedPtr&& stats_scope, bool added_via_api,
18194
                        Server::Configuration::TransportSocketFactoryContext& ctx) {
18194
  absl::Status creation_status = absl::OkStatus();
18194
  auto ret = std::unique_ptr<ClusterInfoImpl>(new ClusterInfoImpl(
18194
      info, server_context, config, bind_config, runtime, std::move(socket_matcher),
18194
      std::move(stats_scope), added_via_api, ctx, creation_status));
18194
  RETURN_IF_NOT_OK(creation_status);
18180
  return ret;
18194
}
ClusterInfoImpl::ClusterInfoImpl(
    Init::Manager& init_manager, Server::Configuration::ServerFactoryContext& server_context,
    const envoy::config::cluster::v3::Cluster& config,
    const absl::optional<envoy::config::core::v3::BindConfig>& bind_config,
    Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher,
    Stats::ScopeSharedPtr&& stats_scope, bool added_via_api,
    Server::Configuration::TransportSocketFactoryContext& factory_context,
    absl::Status& creation_status)
18194
    : runtime_(runtime), name_(config.name()),
18194
      observability_name_(!config.alt_stat_name().empty()
18194
                              ? std::make_unique<std::string>(config.alt_stat_name())
18194
                              : nullptr),
18194
      eds_service_name_(
18194
          config.has_eds_cluster_config()
18194
              ? std::make_unique<std::string>(config.eds_cluster_config().service_name())
18194
              : nullptr),
18194
      extension_protocol_options_(THROW_OR_RETURN_VALUE(
          parseExtensionProtocolOptions(config, factory_context), ProtocolOptionsHashMap)),
18194
      http_protocol_options_(THROW_OR_RETURN_VALUE(
          createOptions(config,
                        extensionProtocolOptionsTyped<HttpProtocolOptionsConfigImpl>(
                            "envoy.extensions.upstreams.http.v3.HttpProtocolOptions"),
                        factory_context),
          std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>)),
18194
      tcp_protocol_options_(extensionProtocolOptionsTyped<TcpProtocolOptionsConfigImpl>(
18194
          "envoy.extensions.upstreams.tcp.v3.TcpProtocolOptions")),
18194
      max_requests_per_connection_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
          http_protocol_options_->common_http_protocol_options_, max_requests_per_connection,
          config.max_requests_per_connection().value())),
      connect_timeout_(
18194
          std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, connect_timeout, 5000))),
18194
      per_upstream_preconnect_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
          config.preconnect_policy(), per_upstream_preconnect_ratio, 1.0)),
18194
      peekahead_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.preconnect_policy(),
                                                       predictive_preconnect_ratio, 0)),
18194
      socket_matcher_(std::move(socket_matcher)), stats_scope_(std::move(stats_scope)),
18194
      traffic_stats_(generateStats(
18194
          stats_scope_, factory_context.serverFactoryContext().clusterManager().clusterStatNames(),
18194
          server_context.statsConfig().enableDeferredCreationStats())),
18194
      config_update_stats_(
18194
          factory_context.serverFactoryContext().clusterManager().clusterConfigUpdateStatNames(),
18194
          *stats_scope_),
18194
      lb_stats_(factory_context.serverFactoryContext().clusterManager().clusterLbStatNames(),
18194
                *stats_scope_),
18194
      endpoint_stats_(
18194
          factory_context.serverFactoryContext().clusterManager().clusterEndpointStatNames(),
18194
          *stats_scope_),
18194
      load_report_stats_store_(stats_scope_->symbolTable()),
18194
      load_report_stats_(generateLoadReportStats(
18194
          *load_report_stats_store_.rootScope(),
18194
          factory_context.serverFactoryContext().clusterManager().clusterLoadReportStatNames())),
      optional_cluster_stats_(
18194
          (config.has_track_cluster_stats() || config.track_timeout_budgets())
18194
              ? std::make_unique<OptionalClusterStats>(
40
                    config, *stats_scope_, factory_context.serverFactoryContext().clusterManager())
18194
              : nullptr),
18194
      features_(ClusterInfoImpl::HttpProtocolOptionsConfigImpl::parseFeatures(
18194
          config, *http_protocol_options_)),
18194
      resource_managers_(config, runtime, name_, *stats_scope_,
18194
                         factory_context.serverFactoryContext()
18194
                             .clusterManager()
18194
                             .clusterCircuitBreakersStatNames()),
18194
      maintenance_mode_runtime_key_(absl::StrCat("upstream.maintenance_mode.", name_)),
      upstream_local_address_selector_(
18194
          THROW_OR_RETURN_VALUE(createUpstreamLocalAddressSelector(config, bind_config),
                                Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr)),
18194
      upstream_config_(config.has_upstream_config()
18194
                           ? std::make_unique<envoy::config::core::v3::TypedExtensionConfig>(
7
                                 config.upstream_config())
18194
                           : nullptr),
18194
      metadata_(config.has_metadata()
18194
                    ? std::make_unique<envoy::config::core::v3::Metadata>(config.metadata())
18194
                    : nullptr),
18194
      typed_metadata_(config.has_metadata()
18194
                          ? std::make_unique<ClusterTypedMetadata>(config.metadata())
18194
                          : nullptr),
      common_lb_config_(
18194
          factory_context.serverFactoryContext().clusterManager().getCommonLbConfigPtr(
18194
              config.common_lb_config())),
18194
      cluster_type_(config.has_cluster_type()
18194
                        ? std::make_unique<envoy::config::cluster::v3::Cluster::CustomClusterType>(
361
                              config.cluster_type())
18194
                        : nullptr),
      http_filter_config_provider_manager_(
18194
          Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
18194
              server_context)),
      network_filter_config_provider_manager_(
18194
          createSingletonUpstreamNetworkFilterConfigProviderManager(server_context)),
18194
      upstream_context_(server_context, init_manager, *stats_scope_),
18194
      happy_eyeballs_config_(
18194
          config.upstream_connection_options().has_happy_eyeballs_config()
18194
              ? std::make_unique<
1
                    envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig>(
1
                    config.upstream_connection_options().happy_eyeballs_config())
18194
              : nullptr),
18194
      lrs_report_metric_names_(!config.lrs_report_endpoint_metrics().empty()
18194
                                   ? std::make_unique<Envoy::Orca::LrsReportMetricNames>(
8
                                         config.lrs_report_endpoint_metrics().begin(),
8
                                         config.lrs_report_endpoint_metrics().end())
18194
                                   : nullptr),
18194
      shadow_policies_(http_protocol_options_->shadow_policies_),
      per_connection_buffer_limit_bytes_(
18194
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, per_connection_buffer_limit_bytes, 1024 * 1024)),
18194
      buffer_high_watermark_timeout_(std::chrono::milliseconds(
18194
          PROTOBUF_GET_MS_OR_DEFAULT(config, per_connection_buffer_high_watermark_timeout, 0))),
18194
      max_response_headers_count_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
          http_protocol_options_->common_http_protocol_options_, max_headers_count,
          runtime_.snapshot().getInteger(Http::MaxResponseHeadersCountOverrideKey,
                                         Http::DEFAULT_MAX_HEADERS_COUNT))),
18194
      max_response_headers_kb_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
          http_protocol_options_->common_http_protocol_options_, max_response_headers_kb,
          [&]() -> absl::optional<uint16_t> {
            constexpr uint64_t unspecified = 0;
            uint64_t runtime_val = runtime_.snapshot().getInteger(
                Http::MaxResponseHeadersSizeOverrideKey, unspecified);
            if (runtime_val == unspecified) {
              return absl::nullopt;
            }
            return runtime_val;
          }())),
18194
      type_(config.type()),
18194
      drain_connections_on_host_removal_(config.ignore_health_on_host_removal()),
      connection_pool_per_downstream_connection_(
18194
          config.connection_pool_per_downstream_connection()),
18194
      warm_hosts_(!config.health_checks().empty() &&
18194
                  common_lb_config_->ignore_new_hosts_until_first_hc()),
      set_local_interface_name_on_upstream_connections_(
18194
          config.upstream_connection_options().set_local_interface_name_on_upstream_connections()),
18194
      added_via_api_(added_via_api),
18194
      per_endpoint_stats_(config.has_track_cluster_stats() &&
18194
                          config.track_cluster_stats().per_endpoint_stats()) {
#ifdef WIN32
  if (set_local_interface_name_on_upstream_connections_) {
    creation_status = absl::InvalidArgumentError(
        "set_local_interface_name_on_upstream_connections_ cannot be set to true "
        "on Windows platforms");
    return;
  }
#endif
  // Both LoadStatsReporter interface implementations and per_endpoint_stats need to `latch()` the
  // counters, so if both are
  // configured they will interfere with each other and both get incorrect values.
  // TODO(ggreenway): Verify that bypassing virtual dispatch here was intentional
18194
  if (ClusterInfoImpl::perEndpointStatsEnabled() &&
18194
      server_context.bootstrap().cluster_manager().has_load_stats_config()) {
2
    creation_status =
2
        absl::InvalidArgumentError("Only one of cluster per_endpoint_stats and cluster manager "
2
                                   "load_stats_config can be specified");
2
    return;
2
  }
18192
  if (config.has_max_requests_per_connection() &&
18192
      http_protocol_options_->common_http_protocol_options_.has_max_requests_per_connection()) {
1
    creation_status =
1
        absl::InvalidArgumentError("Only one of max_requests_per_connection from Cluster or "
1
                                   "HttpProtocolOptions can be specified");
1
    return;
1
  }
18191
  if (config.has_load_balancing_policy() ||
18191
      config.lb_policy() == envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG) {
    // If load_balancing_policy is set we will use it directly, ignoring lb_policy.
5080
    SET_AND_RETURN_IF_NOT_OK(configureLbPolicies(config, server_context), creation_status);
17713
  } else {
    // If load_balancing_policy is not set, we will try to convert legacy lb_policy
    // to load_balancing_policy and use it.
13111
    auto lb_pair =
13111
        LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto(server_context, config);
13111
    SET_AND_RETURN_IF_NOT_OK(lb_pair.status(), creation_status);
13109
    load_balancer_factory_ = lb_pair->factory;
13109
    ASSERT(load_balancer_factory_ != nullptr, "null load balancer factory");
13109
    load_balancer_config_ = std::move(lb_pair->config);
13109
  }
18184
  if (config.lb_subset_config().locality_weight_aware() &&
18184
      !config.common_lb_config().has_locality_weighted_lb_config()) {
1
    creation_status =
1
        absl::InvalidArgumentError(fmt::format("Locality weight aware subset LB requires that a "
1
                                               "locality_weighted_lb_config be set in {}",
1
                                               name_));
1
    return;
1
  }
  // Use default (1h) or configured `idle_timeout`, unless it's set to 0, indicating that no
  // timeout should be used.
18183
  absl::optional<std::chrono::milliseconds> idle_timeout(std::chrono::hours(1));
18183
  if (http_protocol_options_->common_http_protocol_options_.has_idle_timeout()) {
52
    idle_timeout = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
52
        http_protocol_options_->common_http_protocol_options_.idle_timeout()));
52
    if (idle_timeout.value().count() == 0) {
4
      idle_timeout = absl::nullopt;
4
    }
52
  }
18183
  if (idle_timeout.has_value()) {
18152
    optional_timeouts_.set<OptionalTimeoutNames::IdleTimeout>(*idle_timeout);
18152
  }
  // Use default (10m) or configured `tcp_pool_idle_timeout`, unless it's set to 0, indicating
  // that no timeout should be used.
18183
  absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout(std::chrono::minutes(10));
18183
  if (tcp_protocol_options_ && tcp_protocol_options_->idleTimeout().has_value()) {
5
    tcp_pool_idle_timeout = tcp_protocol_options_->idleTimeout();
5
    if (tcp_pool_idle_timeout.value().count() == 0) {
2
      tcp_pool_idle_timeout = absl::nullopt;
2
    }
5
  }
18183
  if (tcp_pool_idle_timeout.has_value()) {
18154
    optional_timeouts_.set<OptionalTimeoutNames::TcpPoolIdleTimeout>(*tcp_pool_idle_timeout);
18154
  }
  // Use configured `max_connection_duration`, unless it's set to 0, indicating that
  // no timeout should be used. No timeout by default either.
18183
  absl::optional<std::chrono::milliseconds> max_connection_duration;
18183
  if (http_protocol_options_->common_http_protocol_options_.has_max_connection_duration()) {
15
    max_connection_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
15
        http_protocol_options_->common_http_protocol_options_.max_connection_duration()));
15
    if (max_connection_duration.value().count() == 0) {
2
      max_connection_duration = absl::nullopt;
2
    }
15
  }
18183
  if (max_connection_duration.has_value()) {
13
    optional_timeouts_.set<OptionalTimeoutNames::MaxConnectionDuration>(*max_connection_duration);
13
  }
18183
  if (config.has_eds_cluster_config()) {
613
    if (config.type() != envoy::config::cluster::v3::Cluster::EDS) {
2
      creation_status = absl::InvalidArgumentError("eds_cluster_config set in a non-EDS cluster");
2
      return;
2
    }
613
  }
  // TODO(htuch): Remove this temporary workaround when we have
  // https://github.com/bufbuild/protoc-gen-validate/issues/97 resolved. This just provides
  // early validation of sanity of fields that we should catch at config ingestion.
18181
  DurationUtil::durationToMilliseconds(common_lb_config_->update_merge_window());
  // Create upstream network filter factories
18181
  const auto& filters = config.filters();
18181
  ASSERT(filter_factories_.empty());
18181
  filter_factories_.reserve(filters.size());
18233
  for (ssize_t i = 0; i < filters.size(); i++) {
53
    const auto& proto_config = filters[i];
53
    const bool is_terminal = i == filters.size() - 1;
53
    ENVOY_LOG(debug, "  upstream network filter #{}:", i);
53
    if (proto_config.has_config_discovery()) {
37
      if (proto_config.has_typed_config()) {
1
        creation_status =
1
            absl::InvalidArgumentError("Only one of typed_config or config_discovery can be used");
1
        return;
1
      }
36
      ENVOY_LOG(debug, "      dynamic filter name: {}", proto_config.name());
36
      filter_factories_.push_back(
36
          network_filter_config_provider_manager_->createDynamicFilterConfigProvider(
36
              proto_config.config_discovery(), proto_config.name(), server_context,
36
              upstream_context_, factory_context.serverFactoryContext().clusterManager(),
36
              is_terminal, "network", nullptr));
36
      continue;
37
    }
16
    ENVOY_LOG(debug, "    name: {}", proto_config.name());
16
    auto& factory = Config::Utility::getAndCheckFactory<
16
        Server::Configuration::NamedUpstreamNetworkFilterConfigFactory>(proto_config);
16
    auto message = factory.createEmptyConfigProto();
16
    SET_AND_RETURN_IF_NOT_OK(
16
        Config::Utility::translateOpaqueConfig(
16
            proto_config.typed_config(), factory_context.messageValidationVisitor(), *message),
16
        creation_status);
16
    Network::FilterFactoryCb callback =
16
        factory.createFilterFactoryFromProto(*message, upstream_context_);
16
    filter_factories_.push_back(
16
        network_filter_config_provider_manager_->createStaticFilterConfigProvider(
16
            callback, proto_config.name()));
16
  }
18180
  if (http_protocol_options_) {
18153
    if (!http_protocol_options_->http_filters_.empty()) {
455
      creation_status = Http::FilterChainUtility::checkUpstreamHttpFiltersList(
455
          http_protocol_options_->http_filters_);
455
      if (!creation_status.ok()) {
        return;
      }
455
      std::string prefix = stats_scope_->symbolTable().toString(stats_scope_->prefix());
455
      Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext,
455
                              Server::Configuration::UpstreamHttpFilterConfigFactory>
455
          helper(*http_filter_config_provider_manager_, upstream_context_.serverFactoryContext(),
455
                 factory_context.serverFactoryContext().clusterManager(), upstream_context_,
455
                 prefix);
455
      SET_AND_RETURN_IF_NOT_OK(helper.processFilters(http_protocol_options_->http_filters_,
455
                                                     "upstream http", "upstream http",
455
                                                     http_filter_factories_),
455
                               creation_status);
455
    }
18153
  }
18180
}
// Configures the load balancer based on config.load_balancing_policy
absl::Status
ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Cluster& config,
5080
                                     Server::Configuration::ServerFactoryContext& context) {
  // Check if load_balancing_policy is set first.
5080
  if (!config.has_load_balancing_policy()) {
2
    return absl::InvalidArgumentError("cluster: field load_balancing_policy need to be set");
2
  }
5078
  if (config.has_lb_subset_config()) {
1
    return absl::InvalidArgumentError(
1
        "cluster: load_balancing_policy cannot be combined with lb_subset_config");
1
  }
5077
  if (config.has_common_lb_config()) {
23
    const auto& lb_config = config.common_lb_config();
23
    if (lb_config.has_zone_aware_lb_config() || lb_config.has_locality_weighted_lb_config() ||
23
        lb_config.has_consistent_hashing_lb_config()) {
1
      return absl::InvalidArgumentError(
1
          "cluster: load_balancing_policy cannot be combined with partial fields "
1
          "(zone_aware_lb_config, "
1
          "locality_weighted_lb_config, consistent_hashing_lb_config) of common_lb_config");
1
    }
23
  }
5076
  absl::InlinedVector<absl::string_view, 4> missing_policies;
5078
  for (const auto& policy : config.load_balancing_policy().policies()) {
5078
    TypedLoadBalancerFactory* factory =
5078
        Config::Utility::getAndCheckFactory<TypedLoadBalancerFactory>(
5078
            policy.typed_extension_config(), /*is_optional=*/true);
5078
    if (factory != nullptr) {
      // Load and validate the configuration.
5075
      auto proto_message = factory->createEmptyConfigProto();
5075
      RETURN_IF_NOT_OK(Config::Utility::translateOpaqueConfig(
5075
          policy.typed_extension_config().typed_config(), context.messageValidationVisitor(),
5075
          *proto_message));
5075
      load_balancer_factory_ = factory;
5075
      auto lb_config_or_error = factory->loadConfig(context, *proto_message);
5075
      RETURN_IF_NOT_OK_REF(lb_config_or_error.status());
5075
      load_balancer_config_ = std::move(lb_config_or_error.value());
5075
      break;
5075
    }
3
    missing_policies.push_back(policy.typed_extension_config().name());
3
  }
5076
  if (load_balancer_factory_ == nullptr) {
1
    return absl::InvalidArgumentError(
1
        fmt::format("cluster: didn't find a registered load balancer factory "
1
                    "implementation for cluster: '{}' with names from [{}]",
1
                    name_, absl::StrJoin(missing_policies, ", ")));
1
  }
5075
  return absl::OkStatus();
5076
}
ProtocolOptionsConfigConstSharedPtr
37190
ClusterInfoImpl::extensionProtocolOptions(const std::string& name) const {
37190
  auto i = extension_protocol_options_.find(name);
37190
  if (i != extension_protocol_options_.end()) {
13602
    return i->second;
13602
  }
23588
  return nullptr;
37190
}
absl::StatusOr<Network::UpstreamTransportSocketFactoryPtr> createTransportSocketFactory(
    const envoy::config::cluster::v3::Cluster& config,
18195
    Server::Configuration::TransportSocketFactoryContext& factory_context) {
  // If the cluster config doesn't have a transport socket configured, override with the default
  // transport socket implementation based on the tls_context. We copy by value first then
  // override if necessary.
18195
  auto transport_socket = config.transport_socket();
18195
  if (!config.has_transport_socket()) {
16357
    envoy::extensions::transport_sockets::raw_buffer::v3::RawBuffer raw_buffer;
16357
    transport_socket.mutable_typed_config()->PackFrom(raw_buffer);
16357
    transport_socket.set_name("envoy.transport_sockets.raw_buffer");
16357
  }
18195
  auto& config_factory = Config::Utility::getAndCheckFactory<
18195
      Server::Configuration::UpstreamTransportSocketConfigFactory>(transport_socket);
18195
  ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
18195
      transport_socket, factory_context.messageValidationVisitor(), config_factory);
18195
  return config_factory.createTransportSocketFactory(*message, factory_context);
18195
}
30202
void ClusterInfoImpl::createNetworkFilterChain(Network::Connection& connection) const {
30202
  for (const auto& filter_config_provider : filter_factories_) {
50
    auto config = filter_config_provider->config();
50
    if (config.has_value()) {
50
      Network::FilterFactoryCb& factory = config.value();
50
      factory(connection);
50
    }
50
  }
30202
}
std::vector<Http::Protocol>
47301
ClusterInfoImpl::upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const {
47301
  if (downstream_protocol.has_value() &&
47301
      features_ & Upstream::ClusterInfo::Features::USE_DOWNSTREAM_PROTOCOL) {
14
    if (downstream_protocol.value() == Http::Protocol::Http3 &&
14
        !(features_ & Upstream::ClusterInfo::Features::HTTP3)) {
1
      return {Http::Protocol::Http2};
1
    }
    // use HTTP11 since HTTP10 upstream is not supported yet.
13
    if (downstream_protocol.value() == Http::Protocol::Http10) {
2
      return {Http::Protocol::Http11};
2
    }
11
    return {downstream_protocol.value()};
13
  }
47287
  if (features_ & Upstream::ClusterInfo::Features::USE_ALPN) {
94
    if (!(features_ & Upstream::ClusterInfo::Features::HTTP3)) {
37
      return {Http::Protocol::Http2, Http::Protocol::Http11};
37
    }
57
    return {Http::Protocol::Http3, Http::Protocol::Http2, Http::Protocol::Http11};
94
  }
47193
  if (features_ & Upstream::ClusterInfo::Features::HTTP3) {
1453
    return {Http::Protocol::Http3};
1453
  }
45740
  return {(features_ & Upstream::ClusterInfo::Features::HTTP2) ? Http::Protocol::Http2
45740
                                                               : Http::Protocol::Http11};
47193
}
absl::optional<bool>
37397
ClusterInfoImpl::processHttpForOutlierDetection(Http::ResponseHeaderMap& headers) const {
37397
  if (http_protocol_options_->outlier_detection_http_error_matcher_.empty()) {
37373
    return absl::nullopt;
37373
  }
24
  Extensions::Common::Matcher::Matcher::MatchStatusVector statuses;
24
  statuses.reserve(http_protocol_options_->outlier_detection_http_error_matcher_.size());
24
  statuses = Extensions::Common::Matcher::Matcher::MatchStatusVector(
24
      http_protocol_options_->outlier_detection_http_error_matcher_.size());
24
  http_protocol_options_->outlier_detection_http_error_matcher_[0]->onNewStream(statuses);
  // Run matchers.
24
  http_protocol_options_->outlier_detection_http_error_matcher_[0]->onHttpResponseHeaders(headers,
24
                                                                                          statuses);
24
  return absl::optional<bool>(http_protocol_options_->outlier_detection_http_error_matcher_[0]
24
                                  ->matchStatus(statuses)
24
                                  .matches_);
37397
}
absl::StatusOr<bool> validateTransportSocketSupportsQuic(
1061
    const envoy::config::core::v3::TransportSocket& transport_socket) {
  // The transport socket is valid for QUIC if it is either a QUIC transport socket,
  // or if it is a QUIC transport socket wrapped in an HTTP/1.1 proxy socket.
1061
  if (transport_socket.name() == "envoy.transport_sockets.quic") {
1056
    return true;
1056
  }
5
  if (transport_socket.name() != "envoy.transport_sockets.http_11_proxy") {
1
    return false;
1
  }
4
  envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport
4
      http11_socket;
4
  RETURN_IF_NOT_OK(MessageUtil::unpackTo(transport_socket.typed_config(), http11_socket));
4
  return http11_socket.transport_socket().name() == "envoy.transport_sockets.quic";
4
}
ClusterImplBase::ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster,
                                 ClusterFactoryContext& cluster_context,
                                 absl::Status& creation_status)
18149
    : init_manager_(fmt::format("Cluster {}", cluster.name())),
18149
      init_watcher_("ClusterImplBase", [this]() { onInitDone(); }),
18149
      runtime_(cluster_context.serverFactoryContext().runtime()),
18149
      wait_for_warm_on_init_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(cluster, wait_for_warm_on_init, true)),
18149
      random_(cluster_context.serverFactoryContext().api().randomGenerator()),
      local_cluster_(
18149
          cluster_context.serverFactoryContext().clusterManager().localClusterName().value_or("") ==
18149
          cluster.name()),
18149
      const_metadata_shared_pool_(Config::Metadata::getConstMetadataSharedPool(
18149
          cluster_context.serverFactoryContext().singletonManager(),
18149
          cluster_context.serverFactoryContext().mainThreadDispatcher())),
18149
      const_locality_shared_pool_(LocalityPool::getConstLocalitySharedPool(
18149
          cluster_context.serverFactoryContext().singletonManager(),
18149
          cluster_context.serverFactoryContext().mainThreadDispatcher())) {
18149
  auto& server_context = cluster_context.serverFactoryContext();
18149
  auto stats_scope = generateStatsScope(cluster, server_context.serverScope().store());
18149
  transport_factory_context_ =
18149
      std::make_unique<Server::Configuration::TransportSocketFactoryContextImpl>(
18149
          server_context, *stats_scope, cluster_context.messageValidationVisitor());
18149
  transport_factory_context_->setInitManager(init_manager_);
18149
  auto socket_factory_or_error = createTransportSocketFactory(cluster, *transport_factory_context_);
18149
  SET_AND_RETURN_IF_NOT_OK(socket_factory_or_error.status(), creation_status);
18148
  auto* raw_factory_pointer = socket_factory_or_error.value().get();
18148
  OptRef<const xds::type::matcher::v3::Matcher> matcher;
18148
  if (cluster.has_transport_socket_matcher()) {
2
    matcher = makeOptRefFromPtr(&cluster.transport_socket_matcher());
2
  }
18148
  auto socket_matcher_or_error = TransportSocketMatcherImpl::create(
18148
      cluster.transport_socket_matches(), matcher, *transport_factory_context_,
18148
      socket_factory_or_error.value(), *stats_scope);
18148
  SET_AND_RETURN_IF_NOT_OK(socket_matcher_or_error.status(), creation_status);
18148
  auto socket_matcher = std::move(*socket_matcher_or_error);
18148
  const bool matcher_supports_alpn = socket_matcher->allMatchesSupportAlpn();
18148
  auto& dispatcher = server_context.mainThreadDispatcher();
18148
  auto info_or_error =
18148
      ClusterInfoImpl::create(init_manager_, server_context, cluster,
18148
                              cluster_context.serverFactoryContext().clusterManager().bindConfig(),
18148
                              runtime_, std::move(socket_matcher), std::move(stats_scope),
18148
                              cluster_context.addedViaApi(), *transport_factory_context_);
18148
  SET_AND_RETURN_IF_NOT_OK(info_or_error.status(), creation_status);
18134
  info_ = std::shared_ptr<const ClusterInfoImpl>(
18134
      (*info_or_error).release(), [&dispatcher](const ClusterInfoImpl* self) {
18107
        ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name());
18107
        dispatcher.deleteInDispatcherThread(
18107
            std::unique_ptr<const Event::DispatcherThreadDeletable>(self));
18107
      });
18134
  if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN)) {
66
    if (!raw_factory_pointer->supportsAlpn()) {
1
      creation_status = absl::InvalidArgumentError(
1
          fmt::format("ALPN configured for cluster {} which has a non-ALPN transport socket: {}",
1
                      cluster.name(), cluster.DebugString()));
1
      return;
1
    }
65
    if (!matcher_supports_alpn &&
65
        !runtime_.snapshot().featureEnabled(ClusterImplBase::DoNotValidateAlpnRuntimeKey, 0)) {
1
      creation_status = absl::InvalidArgumentError(fmt::format(
1
          "ALPN configured for cluster {} which has a non-ALPN transport socket matcher: {}",
1
          cluster.name(), cluster.DebugString()));
1
      return;
1
    }
65
  }
18132
  if (info_->features() & ClusterInfoImpl::Features::HTTP3) {
1061
#if defined(ENVOY_ENABLE_QUIC)
1061
    absl::StatusOr<bool> supports_quic =
1061
        validateTransportSocketSupportsQuic(cluster.transport_socket());
1061
    SET_AND_RETURN_IF_NOT_OK(supports_quic.status(), creation_status);
1061
    if (!*supports_quic) {
1
      creation_status = absl::InvalidArgumentError(
1
          fmt::format("HTTP3 requires a QuicUpstreamTransport transport socket: {} {}",
1
                      cluster.name(), cluster.transport_socket().DebugString()));
1
      return;
1
    }
#else
    creation_status = absl::InvalidArgumentError("HTTP3 configured but not enabled in the build.");
    return;
#endif
1061
  }
  // Create the default (empty) priority set before registering callbacks to
  // avoid getting an update the first time it is accessed.
18131
  priority_set_.getOrCreateHostSet(0);
18131
  priority_update_cb_ = priority_set_.addPriorityUpdateCb(
18612
      [this](uint32_t, const HostVector& hosts_added, const HostVector& hosts_removed) {
17898
        if (!hosts_added.empty() || !hosts_removed.empty()) {
17631
          info_->endpointStats().membership_change_.inc();
17631
        }
17898
        uint32_t healthy_hosts = 0;
17898
        uint32_t degraded_hosts = 0;
17898
        uint32_t excluded_hosts = 0;
17898
        uint32_t hosts = 0;
18210
        for (const auto& host_set : prioritySet().hostSetsPerPriority()) {
18210
          hosts += host_set->hosts().size();
18210
          healthy_hosts += host_set->healthyHosts().size();
18210
          degraded_hosts += host_set->degradedHosts().size();
18210
          excluded_hosts += host_set->excludedHosts().size();
18210
        }
17898
        info_->endpointStats().membership_total_.set(hosts);
17898
        info_->endpointStats().membership_healthy_.set(healthy_hosts);
17898
        info_->endpointStats().membership_degraded_.set(degraded_hosts);
17898
        info_->endpointStats().membership_excluded_.set(excluded_hosts);
17898
      });
  // Drop overload configuration parsing.
18131
  SET_AND_RETURN_IF_NOT_OK(parseDropOverloadConfig(cluster.load_assignment()), creation_status);
18099
}
namespace {
38069
bool excludeBasedOnHealthFlag(const Host& host) {
38069
  return host.healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) ||
38069
         host.healthFlagGet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL) ||
38069
         host.healthFlagGet(Host::HealthFlag::EDS_STATUS_DRAINING);
38069
}
} // namespace
std::tuple<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr,
           ExcludedHostVectorConstSharedPtr>
18060
ClusterImplBase::partitionHostList(const HostVector& hosts) {
18060
  auto healthy_list = std::make_shared<HealthyHostVector>();
18060
  auto degraded_list = std::make_shared<DegradedHostVector>();
18060
  auto excluded_list = std::make_shared<ExcludedHostVector>();
18060
  healthy_list->get().reserve(hosts.size());
19283
  for (const auto& host : hosts) {
19259
    const Host::Health health_status = host->coarseHealth();
19259
    if (health_status == Host::Health::Healthy) {
18662
      healthy_list->get().emplace_back(host);
18803
    } else if (health_status == Host::Health::Degraded) {
66
      degraded_list->get().emplace_back(host);
66
    }
19259
    if (excludeBasedOnHealthFlag(*host)) {
16
      excluded_list->get().emplace_back(host);
16
    }
19259
  }
18060
  return std::make_tuple(healthy_list, degraded_list, excluded_list);
18060
}
std::tuple<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr,
           HostsPerLocalityConstSharedPtr>
18060
ClusterImplBase::partitionHostsPerLocality(const HostsPerLocality& hosts) {
18060
  auto filtered_clones =
18998
      hosts.filter({[](const Host& host) { return host.coarseHealth() == Host::Health::Healthy; },
18998
                    [](const Host& host) { return host.coarseHealth() == Host::Health::Degraded; },
18998
                    [](const Host& host) { return excludeBasedOnHealthFlag(host); }});
18060
  return std::make_tuple(std::move(filtered_clones[0]), std::move(filtered_clones[1]),
18060
                         std::move(filtered_clones[2]));
18060
}
47317
bool ClusterInfoImpl::maintenanceMode() const {
47317
  return runtime_.snapshot().featureEnabled(maintenance_mode_runtime_key_, 0);
47317
}
412050
ResourceManager& ClusterInfoImpl::resourceManager(ResourcePriority priority) const {
412050
  ASSERT(enumToInt(priority) < resource_managers_.managers_.size());
412050
  return *resource_managers_.managers_[enumToInt(priority)];
412050
}
17729
void ClusterImplBase::initialize(std::function<absl::Status()> callback) {
17729
  ASSERT(!initialization_started_);
17729
  ASSERT(initialization_complete_callback_ == nullptr);
17729
  initialization_complete_callback_ = callback;
17729
  startPreInit();
17729
}
18034
void ClusterImplBase::onPreInitComplete() {
  // Protect against multiple calls.
18034
  if (initialization_started_) {
463
    return;
463
  }
17571
  initialization_started_ = true;
17571
  ENVOY_LOG(debug, "initializing {} cluster {} completed",
17571
            initializePhase() == InitializePhase::Primary ? "Primary" : "Secondary",
17571
            info()->name());
17571
  init_manager_.initialize(init_watcher_);
17571
}
17571
void ClusterImplBase::onInitDone() {
17571
  info()->configUpdateStats().warming_state_.set(0);
17571
  if (health_checker_ && pending_initialize_health_checks_ == 0) {
110
    for (auto& host_set : prioritySet().hostSetsPerPriority()) {
130
      for (auto& host : host_set->hosts()) {
111
        if (host->disableActiveHealthCheck()) {
6
          continue;
6
        }
105
        ++pending_initialize_health_checks_;
105
      }
110
    }
107
    ENVOY_LOG(debug, "Cluster onInitDone pending initialize health check count {}",
107
              pending_initialize_health_checks_);
    // TODO(mattklein123): Remove this callback when done.
107
    health_checker_->addHostCheckCompleteCb(
164
        [this](HostSharedPtr, HealthTransition, HealthState) -> void {
142
          if (pending_initialize_health_checks_ > 0 && --pending_initialize_health_checks_ == 0) {
60
            finishInitialization();
60
          }
142
        });
107
  }
17571
  if (pending_initialize_health_checks_ == 0) {
17487
    finishInitialization();
17487
  }
17571
}
17547
void ClusterImplBase::finishInitialization() {
17547
  ASSERT(initialization_complete_callback_ != nullptr);
17547
  ASSERT(initialization_started_);
  // Snap a copy of the completion callback so that we can set it to nullptr to unblock
  // reloadHealthyHosts(). See that function for more info on why we do this.
17547
  auto snapped_callback = initialization_complete_callback_;
17547
  initialization_complete_callback_ = nullptr;
17547
  if (health_checker_ != nullptr) {
83
    reloadHealthyHosts(nullptr);
83
  }
17547
  if (snapped_callback != nullptr) {
17547
    THROW_IF_NOT_OK(snapped_callback());
17547
  }
17547
}
absl::Status ClusterImplBase::parseDropOverloadConfig(
18879
    const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment) {
  // Default drop_overload_ to zero.
18879
  drop_overload_ = UnitFloat(0);
18879
  if (!cluster_load_assignment.has_policy()) {
18743
    return absl::OkStatus();
18743
  }
136
  const auto& policy = cluster_load_assignment.policy();
136
  if (policy.drop_overloads().empty()) {
42
    return absl::OkStatus();
42
  }
94
  if (policy.drop_overloads().size() > kDropOverloadSize) {
8
    return absl::InvalidArgumentError(
8
        fmt::format("Cluster drop_overloads config has {} categories. Envoy only support one.",
8
                    policy.drop_overloads().size()));
8
  }
86
  const auto& drop_percentage = policy.drop_overloads(0).drop_percentage();
86
  float denominator = 100;
86
  switch (drop_percentage.denominator()) {
62
  case envoy::type::v3::FractionalPercent::HUNDRED:
62
    denominator = 100;
62
    break;
8
  case envoy::type::v3::FractionalPercent::TEN_THOUSAND:
8
    denominator = 10000;
8
    break;
8
  case envoy::type::v3::FractionalPercent::MILLION:
8
    denominator = 1000000;
8
    break;
8
  default:
8
    return absl::InvalidArgumentError(fmt::format(
8
        "Cluster drop_overloads config denominator setting is invalid : {}. Valid range 0~2.",
8
        static_cast<int>(drop_percentage.denominator())));
86
  }
  // If DropOverloadRuntimeKey is not enabled, honor the EDS drop_overload config.
  // If it is enabled, choose the smaller one between it and the EDS config.
78
  float drop_ratio = float(drop_percentage.numerator()) / (denominator);
78
  if (drop_ratio > 1) {
8
    return absl::InvalidArgumentError(
8
        fmt::format("Cluster drop_overloads config is invalid. drop_ratio={}(Numerator {} / "
8
                    "Denominator {}). The valid range is 0~1.",
8
                    drop_ratio, drop_percentage.numerator(), denominator));
8
  }
70
  const uint64_t MAX_DROP_OVERLOAD_RUNTIME = 100;
70
  uint64_t drop_ratio_runtime = runtime_.snapshot().getInteger(
70
      ClusterImplBase::DropOverloadRuntimeKey, MAX_DROP_OVERLOAD_RUNTIME);
70
  if (drop_ratio_runtime > MAX_DROP_OVERLOAD_RUNTIME) {
8
    return absl::InvalidArgumentError(
8
        fmt::format("load_balancing_policy.drop_overload_limit runtime key config {} is invalid. "
8
                    "The valid range is 0~100",
8
                    drop_ratio_runtime));
8
  }
62
  drop_ratio = std::min(drop_ratio, float(drop_ratio_runtime) / float(MAX_DROP_OVERLOAD_RUNTIME));
62
  drop_overload_ = UnitFloat(drop_ratio);
62
  drop_category_ = policy.drop_overloads(0).category();
62
  return absl::OkStatus();
70
}
121
void ClusterImplBase::setHealthChecker(const HealthCheckerSharedPtr& health_checker) {
121
  ASSERT(!health_checker_);
121
  health_checker_ = health_checker;
121
  health_checker_->start();
121
  health_checker_->addHostCheckCompleteCb(
178
      [this](const HostSharedPtr& host, HealthTransition changed_state, HealthState) -> void {
        // If we get a health check completion that resulted in a state change, signal to
        // update the host sets on all threads.
142
        if (changed_state == HealthTransition::Changed) {
107
          reloadHealthyHosts(host);
107
        }
142
      });
121
}
17794
void ClusterImplBase::setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector) {
17794
  if (!outlier_detector) {
17741
    return;
17741
  }
53
  outlier_detector_ = outlier_detector;
53
  outlier_detector_->addChangedStateCb(
55
      [this](const HostSharedPtr& host) -> void { reloadHealthyHosts(host); });
53
}
213
void ClusterImplBase::reloadHealthyHosts(const HostSharedPtr& host) {
  // Every time a host changes Health Check state we cause a full healthy host recalculation which
  // for expensive LBs (ring, subset, etc.) can be quite time consuming. During startup, this
  // can also block worker threads by doing this repeatedly. There is no reason to do this
  // as we will not start taking traffic until we are initialized. By blocking Health Check
  // updates while initializing we can avoid this.
213
  if (initialization_complete_callback_ != nullptr) {
46
    return;
46
  }
167
  reloadHealthyHostsHelper(host);
167
}
101
void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) {
101
  const auto& host_sets = prioritySet().hostSetsPerPriority();
202
  for (size_t priority = 0; priority < host_sets.size(); ++priority) {
101
    const auto& host_set = host_sets[priority];
    // TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet?
101
    HostVectorConstSharedPtr hosts_copy = std::make_shared<HostVector>(host_set->hosts());
101
    HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone();
101
    prioritySet().updateHosts(priority,
101
                              HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
101
                              host_set->localityWeights(), {}, {}, absl::nullopt, absl::nullopt);
101
  }
101
}
absl::StatusOr<const Network::Address::InstanceConstSharedPtr>
18150
ClusterImplBase::resolveProtoAddress(const envoy::config::core::v3::Address& address) {
18150
  absl::Status resolve_status;
18150
  TRY_ASSERT_MAIN_THREAD {
18150
    auto address_or_error = Network::Address::resolveProtoAddress(address);
18150
    if (address_or_error.status().ok()) {
18148
      return address_or_error.value();
18148
    }
2
    resolve_status = address_or_error.status();
2
  }
2
  END_TRY
18150
  CATCH(EnvoyException & e, { resolve_status = absl::InvalidArgumentError(e.what()); });
2
  if (info_->type() == envoy::config::cluster::v3::Cluster::STATIC ||
2
      info_->type() == envoy::config::cluster::v3::Cluster::EDS) {
2
    return absl::InvalidArgumentError(
2
        fmt::format("{}. Consider setting resolver_name or setting cluster type "
2
                    "to 'STRICT_DNS' or 'LOGICAL_DNS'",
2
                    resolve_status.message()));
2
  }
  return resolve_status;
2
}
absl::Status ClusterImplBase::validateEndpoints(
    absl::Span<const envoy::config::endpoint::v3::LocalityLbEndpoints* const> localities,
17732
    OptRef<const PriorityState> priorities) const {
18173
  for (const auto* endpoints : localities) {
17846
    if (local_cluster_ && endpoints->priority() > 0) {
3
      return absl::InvalidArgumentError(
3
          fmt::format("Unexpected non-zero priority for local cluster '{}'.", info()->name()));
3
    }
17846
  }
17729
  if (priorities.has_value()) {
17404
    OptRef<const LoadBalancerConfig> lb_config = info_->loadBalancerConfig();
17404
    if (lb_config.has_value()) {
17404
      return lb_config->validateEndpoints(*priorities);
17404
    }
17404
  }
325
  return absl::OkStatus();
17729
}
ClusterInfoImpl::OptionalClusterStats::OptionalClusterStats(
    const envoy::config::cluster::v3::Cluster& config, Stats::Scope& stats_scope,
    const ClusterManager& manager)
    : timeout_budget_stats_(
40
          (config.track_cluster_stats().timeout_budgets() || config.track_timeout_budgets())
40
              ? std::make_unique<ClusterTimeoutBudgetStats>(generateTimeoutBudgetStats(
6
                    stats_scope, manager.clusterTimeoutBudgetStatNames()))
40
              : nullptr),
      request_response_size_stats_(
40
          (config.track_cluster_stats().request_response_sizes()
40
               ? std::make_unique<ClusterRequestResponseSizeStats>(generateRequestResponseSizeStats(
28
                     stats_scope, manager.clusterRequestResponseSizeStatNames()))
40
               : nullptr)) {}
ClusterInfoImpl::ResourceManagers::ResourceManagers(
    const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime,
    const std::string& cluster_name, Stats::Scope& stats_scope,
    const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names)
18180
    : circuit_breakers_stat_names_(circuit_breakers_stat_names) {
18180
  managers_[enumToInt(ResourcePriority::Default)] = THROW_OR_RETURN_VALUE(
18180
      load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::DEFAULT),
18180
      ResourceManagerImplPtr);
18180
  managers_[enumToInt(ResourcePriority::High)] = THROW_OR_RETURN_VALUE(
18180
      load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::HIGH),
18180
      ResourceManagerImplPtr);
18180
}
ClusterCircuitBreakersStats
ClusterInfoImpl::generateCircuitBreakersStats(Stats::Scope& scope, Stats::StatName prefix,
                                              bool track_remaining,
264616
                                              const ClusterCircuitBreakersStatNames& stat_names) {
2464600
  auto make_gauge = [&stat_names, &scope, prefix](Stats::StatName stat_name) -> Stats::Gauge& {
2464600
    return Stats::Utility::gaugeFromElements(scope,
2464600
                                             {stat_names.circuit_breakers_, prefix, stat_name},
2464600
                                             Stats::Gauge::ImportMode::Accumulate);
2464600
  };
264616
#define REMAINING_GAUGE(stat_name)                                                                 \
1323080
  track_remaining ? make_gauge(stat_name) : scope.store().nullGauge()
264616
  return {
264616
      make_gauge(stat_names.cx_open_),
264616
      make_gauge(stat_names.cx_pool_open_),
264616
      make_gauge(stat_names.rq_open_),
264616
      make_gauge(stat_names.rq_pending_open_),
264616
      make_gauge(stat_names.rq_retry_open_),
264616
      REMAINING_GAUGE(stat_names.remaining_cx_),
264616
      REMAINING_GAUGE(stat_names.remaining_cx_pools_),
264616
      REMAINING_GAUGE(stat_names.remaining_pending_),
264616
      REMAINING_GAUGE(stat_names.remaining_retries_),
264616
      REMAINING_GAUGE(stat_names.remaining_rq_),
264616
  };
264616
#undef REMAINING_GAUGE
264616
}
21971
Http::Http1::CodecStats& ClusterInfoImpl::http1CodecStats() const {
21971
  return Http::Http1::CodecStats::atomicGet(http1_codec_stats_, *stats_scope_);
21971
}
5985
Http::Http2::CodecStats& ClusterInfoImpl::http2CodecStats() const {
5985
  return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, *stats_scope_);
5985
}
935
Http::Http3::CodecStats& ClusterInfoImpl::http3CodecStats() const {
935
  return Http::Http3::CodecStats::atomicGet(http3_codec_stats_, *stats_scope_);
935
}
#ifdef ENVOY_ENABLE_UHV
::Envoy::Http::HeaderValidatorStats&
ClusterInfoImpl::getHeaderValidatorStats(Http::Protocol protocol) const {
  switch (protocol) {
  case Http::Protocol::Http10:
  case Http::Protocol::Http11:
    return http1CodecStats();
  case Http::Protocol::Http2:
    return http2CodecStats();
  case Http::Protocol::Http3:
    return http3CodecStats();
  }
  PANIC_DUE_TO_CORRUPT_ENUM;
}
#endif
Http::ClientHeaderValidatorPtr
46707
ClusterInfoImpl::makeHeaderValidator([[maybe_unused]] Http::Protocol protocol) const {
#ifdef ENVOY_ENABLE_UHV
  return http_protocol_options_->header_validator_factory_
             ? http_protocol_options_->header_validator_factory_->createClientHeaderValidator(
                   protocol, getHeaderValidatorStats(protocol))
             : nullptr;
#else
46707
  return nullptr;
46707
#endif
46707
}
std::pair<absl::optional<double>, absl::optional<uint32_t>> ClusterInfoImpl::getRetryBudgetParams(
261
    const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds) {
261
  constexpr double default_budget_percent = 20.0;
261
  constexpr uint32_t default_retry_concurrency = 3;
261
  absl::optional<double> budget_percent;
261
  absl::optional<uint32_t> min_retry_concurrency;
261
  if (thresholds.has_retry_budget()) {
    // The budget_percent and min_retry_concurrency values are only set if there is a retry budget
    // message set in the cluster config.
8
    budget_percent = PROTOBUF_GET_WRAPPED_OR_DEFAULT(thresholds.retry_budget(), budget_percent,
8
                                                     default_budget_percent);
8
    min_retry_concurrency = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
8
        thresholds.retry_budget(), min_retry_concurrency, default_retry_concurrency);
8
  }
261
  return std::make_pair(budget_percent, min_retry_concurrency);
261
}
SINGLETON_MANAGER_REGISTRATION(upstream_network_filter_config_provider_manager);
std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
ClusterInfoImpl::createSingletonUpstreamNetworkFilterConfigProviderManager(
18167
    Server::Configuration::ServerFactoryContext& context) {
18167
  return context.singletonManager().getTyped<UpstreamNetworkFilterConfigProviderManager>(
18167
      SINGLETON_MANAGER_REGISTERED_NAME(upstream_network_filter_config_provider_manager),
18167
      [] { return std::make_shared<Filter::UpstreamNetworkFilterConfigProviderManagerImpl>(); });
18167
}
absl::StatusOr<ResourceManagerImplPtr>
ClusterInfoImpl::ResourceManagers::load(const envoy::config::cluster::v3::Cluster& config,
                                        Runtime::Loader& runtime, const std::string& cluster_name,
                                        Stats::Scope& stats_scope,
36358
                                        const envoy::config::core::v3::RoutingPriority& priority) {
36358
  uint64_t max_connections = 1024;
36358
  uint64_t max_pending_requests = 1024;
36358
  uint64_t max_requests = 1024;
36358
  uint64_t max_retries = 3;
36358
  uint64_t max_connection_pools = std::numeric_limits<uint64_t>::max();
36358
  uint64_t max_connections_per_host = std::numeric_limits<uint64_t>::max();
36358
  bool track_remaining = false;
36358
  Stats::StatName priority_stat_name;
36358
  std::string priority_name;
36358
  switch (priority) {
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
18180
  case envoy::config::core::v3::DEFAULT:
18180
    priority_stat_name = circuit_breakers_stat_names_.default_;
18180
    priority_name = "default";
18180
    break;
18178
  case envoy::config::core::v3::HIGH:
18178
    priority_stat_name = circuit_breakers_stat_names_.high_;
18178
    priority_name = "high";
18178
    break;
36358
  }
36358
  const std::string runtime_prefix =
36358
      fmt::format("circuit_breakers.{}.{}.", cluster_name, priority_name);
36358
  const auto& thresholds = config.circuit_breakers().thresholds();
36358
  const auto it = std::find_if(
36358
      thresholds.cbegin(), thresholds.cend(),
36362
      [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) {
512
        return threshold.priority() == priority;
512
      });
36358
  const auto& per_host_thresholds = config.circuit_breakers().per_host_thresholds();
36358
  const auto per_host_it = std::find_if(
36358
      per_host_thresholds.cbegin(), per_host_thresholds.cend(),
36358
      [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) {
12
        return threshold.priority() == priority;
12
      });
36358
  absl::optional<double> budget_percent;
36358
  absl::optional<uint32_t> min_retry_concurrency;
36358
  if (it != thresholds.cend()) {
251
    max_connections = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connections, max_connections);
251
    max_pending_requests =
251
        PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_pending_requests, max_pending_requests);
251
    max_requests = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_requests, max_requests);
251
    max_retries = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_retries, max_retries);
251
    track_remaining = it->track_remaining();
251
    max_connection_pools =
251
        PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connection_pools, max_connection_pools);
251
    std::tie(budget_percent, min_retry_concurrency) = ClusterInfoImpl::getRetryBudgetParams(*it);
251
  }
36358
  if (per_host_it != per_host_thresholds.cend()) {
8
    if (per_host_it->has_max_pending_requests() || per_host_it->has_max_requests() ||
8
        per_host_it->has_max_retries() || per_host_it->has_max_connection_pools() ||
8
        per_host_it->has_retry_budget()) {
2
      return absl::InvalidArgumentError("Unsupported field in per_host_thresholds");
2
    }
6
    if (per_host_it->has_max_connections()) {
6
      max_connections_per_host = per_host_it->max_connections().value();
6
    }
6
  }
36356
  return std::make_unique<ResourceManagerImpl>(
36356
      runtime, runtime_prefix, max_connections, max_pending_requests, max_requests, max_retries,
36356
      max_connection_pools, max_connections_per_host,
36356
      ClusterInfoImpl::generateCircuitBreakersStats(stats_scope, priority_stat_name,
36356
                                                    track_remaining, circuit_breakers_stat_names_),
36356
      budget_percent, min_retry_concurrency);
36358
}
PriorityStateManager::PriorityStateManager(ClusterImplBase& cluster,
                                           const LocalInfo::LocalInfo& local_info,
                                           PrioritySet::HostUpdateCb* update_cb)
17739
    : parent_(cluster), local_info_node_(local_info.node()), update_cb_(update_cb) {}
void PriorityStateManager::initializePriorityFor(
18035
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
18035
  const uint32_t priority = locality_lb_endpoint.priority();
18035
  if (priority_state_.size() <= priority) {
17682
    priority_state_.resize(priority + 1);
17682
  }
18035
  if (priority_state_[priority].first == nullptr) {
17684
    priority_state_[priority].first = std::make_unique<HostVector>();
17684
  }
18035
  if (locality_lb_endpoint.has_locality() && locality_lb_endpoint.has_load_balancing_weight()) {
77
    priority_state_[priority].second[locality_lb_endpoint.locality()] =
77
        locality_lb_endpoint.load_balancing_weight().value();
77
  }
18035
}
void PriorityStateManager::registerHostForPriority(
    const std::string& hostname, Network::Address::InstanceConstSharedPtr address,
    const std::vector<Network::Address::InstanceConstSharedPtr>& address_list,
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
18120
    const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint) {
18120
  auto endpoint_metadata =
18120
      lb_endpoint.has_metadata()
18120
          ? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata())
18120
          : nullptr;
18120
  auto locality_metadata =
18120
      locality_lb_endpoint.has_metadata()
18120
          ? parent_.constMetadataSharedPool()->getObject(locality_lb_endpoint.metadata())
18120
          : nullptr;
18120
  const auto host = std::shared_ptr<HostImpl>(THROW_OR_RETURN_VALUE(
18120
      HostImpl::create(
18120
          parent_.info(), hostname, address, endpoint_metadata, locality_metadata,
18120
          lb_endpoint.load_balancing_weight().value(),
18120
          parent_.constLocalitySharedPool()->getObject(locality_lb_endpoint.locality()),
18120
          lb_endpoint.endpoint().health_check_config(), locality_lb_endpoint.priority(),
18120
          lb_endpoint.health_status(), address_list),
18120
      std::unique_ptr<HostImpl>));
18120
  registerHostForPriority(host, locality_lb_endpoint);
18120
}
void PriorityStateManager::registerHostForPriority(
    const HostSharedPtr& host,
18530
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
18530
  const uint32_t priority = locality_lb_endpoint.priority();
  // Should be called after initializePriorityFor.
18530
  ASSERT(priority_state_[priority].first);
18530
  priority_state_[priority].first->emplace_back(host);
18530
}
void PriorityStateManager::updateClusterPrioritySet(
    const uint32_t priority, HostVectorSharedPtr&& current_hosts,
    const absl::optional<HostVector>& hosts_added, const absl::optional<HostVector>& hosts_removed,
    const absl::optional<Upstream::Host::HealthFlag> health_checker_flag,
    absl::optional<bool> weighted_priority_health,
17551
    absl::optional<uint32_t> overprovisioning_factor) {
  // If local locality is not defined then skip populating per locality hosts.
17551
  const auto& local_locality = local_info_node_.locality();
17551
  ENVOY_LOG(trace, "Local locality: {}", local_locality.DebugString());
  // For non-EDS, most likely the current hosts are from priority_state_[priority].first.
17551
  HostVectorSharedPtr hosts(std::move(current_hosts));
17551
  LocalityWeightsMap empty_locality_map;
17551
  LocalityWeightsMap& locality_weights_map =
17551
      priority_state_.size() > priority ? priority_state_[priority].second : empty_locality_map;
17551
  ASSERT(priority_state_.size() > priority || locality_weights_map.empty());
17551
  LocalityWeightsSharedPtr locality_weights;
17551
  std::vector<HostVector> per_locality;
  // TODO: have the load balancing extension indicate, programmatically, whether it needs locality
  // weights, as an optimization in cases where it doesn't.
17551
  locality_weights = std::make_shared<LocalityWeights>();
  // We use std::map to guarantee a stable ordering for zone aware routing.
17551
  std::map<envoy::config::core::v3::Locality, HostVector, LocalityLess> hosts_per_locality;
18439
  for (const HostSharedPtr& host : *hosts) {
    // Take into consideration when a non-EDS cluster has active health checking, i.e. to mark all
    // the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and then fire
    // update callbacks to start the health checking process. The endpoint with disabled active
    // health check should not be set FAILED_ACTIVE_HC here.
18421
    if (health_checker_flag.has_value() && !host->disableActiveHealthCheck()) {
49
      host->healthFlagSet(health_checker_flag.value());
49
    }
18421
    hosts_per_locality[host->locality()].push_back(host);
18421
  }
  // Do we have hosts for the local locality?
17551
  const bool non_empty_local_locality =
17551
      local_info_node_.has_locality() &&
17551
      hosts_per_locality.find(local_locality) != hosts_per_locality.end();
  // As per HostsPerLocality::get(), the per_locality vector must have the local locality hosts
  // first if non_empty_local_locality.
17551
  if (non_empty_local_locality) {
56
    per_locality.emplace_back(hosts_per_locality[local_locality]);
56
    locality_weights->emplace_back(locality_weights_map[local_locality]);
56
  }
  // After the local locality hosts (if any), we place the remaining locality host groups in
  // lexicographic order. This provides a stable ordering for zone aware routing.
17573
  for (auto& entry : hosts_per_locality) {
17523
    if (!non_empty_local_locality || !LocalityEqualTo()(local_locality, entry.first)) {
17467
      per_locality.emplace_back(entry.second);
17467
      locality_weights->emplace_back(locality_weights_map[entry.first]);
17467
    }
17523
  }
17551
  auto per_locality_shared =
17551
      std::make_shared<HostsPerLocalityImpl>(std::move(per_locality), non_empty_local_locality);
  // If a batch update callback was provided, use that. Otherwise directly update
  // the PrioritySet.
17551
  if (update_cb_ != nullptr) {
792
    update_cb_->updateHosts(priority, HostSetImpl::partitionHosts(hosts, per_locality_shared),
792
                            std::move(locality_weights), hosts_added.value_or(*hosts),
792
                            hosts_removed.value_or<HostVector>({}), weighted_priority_health,
792
                            overprovisioning_factor);
17068
  } else {
16759
    parent_.prioritySet().updateHosts(
16759
        priority, HostSetImpl::partitionHosts(hosts, per_locality_shared),
16759
        std::move(locality_weights), hosts_added.value_or(*hosts),
16759
        hosts_removed.value_or<HostVector>({}), weighted_priority_health, overprovisioning_factor);
16759
  }
17551
}
bool BaseDynamicClusterImpl::updateDynamicHostList(
    const HostVector& new_hosts, HostVector& current_priority_hosts,
    HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority,
1111
    const HostMap& all_hosts, const absl::flat_hash_set<std::string>& all_new_hosts) {
1111
  uint64_t max_host_weight = 1;
  // Did hosts change?
  //
  // Have host attributes changed the health of any endpoint? If so, we
  // rebuild the hosts vectors. We only do this if the health status of an
  // endpoint has materially changed (e.g. if previously failing active health
  // checks, we just note it's now failing EDS health status but don't rebuild).
  //
  // TODO(htuch): We can be smarter about this potentially, and not force a full
  // host set update on health status change. The way this would work is to
  // implement a HealthChecker subclass that provides thread local health
  // updates to the Cluster object. This will probably make sense to do in
  // conjunction with https://github.com/envoyproxy/envoy/issues/2874.
1111
  bool hosts_changed = false;
  // Go through and see if the list we have is different from what we just got. If it is, we make
  // a new host list and raise a change notification. We also check for duplicates here. It's
  // possible for DNS to return the same address multiple times, and a bad EDS implementation
  // could do the same thing.
  // Keep track of hosts we see in new_hosts that we are able to match up with an existing host.
1111
  absl::flat_hash_set<std::string> existing_hosts_for_current_priority(
1111
      current_priority_hosts.size());
  // Keep track of hosts we're adding (or replacing)
1111
  absl::flat_hash_set<std::string> new_hosts_for_current_priority(new_hosts.size());
  // Keep track of hosts for which locality is changed.
1111
  absl::flat_hash_set<std::string> hosts_with_updated_locality_for_current_priority(
1111
      current_priority_hosts.size());
  // Keep track of hosts for which active health check flag is changed.
1111
  absl::flat_hash_set<std::string> hosts_with_active_health_check_flag_changed(
1111
      current_priority_hosts.size());
1111
  HostVector final_hosts;
1111
  final_hosts.reserve(new_hosts.size() + current_priority_hosts.size());
1509
  for (const HostSharedPtr& host : new_hosts) {
    // To match a new host with an existing host means comparing their addresses.
1445
    const auto host_address_string = addressToString(host->address());
1445
    auto existing_host = all_hosts.find(host_address_string);
1445
    const bool existing_host_found = existing_host != all_hosts.end();
    // Clear any pending deletion flag on an existing host in case it came back while it was
    // being stabilized. We will set it again below if needed.
1445
    if (existing_host_found) {
516
      existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
516
    }
    // Check if in-place host update should be skipped, i.e. when the following criteria are met
    // (currently there is only one criterion, but we might add more in the future):
    // - The cluster health checker is activated and a new host is matched with the existing one,
    //   but the health check address is different.
1445
    const bool health_check_address_changed =
1445
        (health_checker_ != nullptr && existing_host_found &&
1445
         *existing_host->second->healthCheckAddress() != *host->healthCheckAddress());
1445
    bool locality_changed = false;
1445
    locality_changed = (existing_host_found &&
1445
                        (!LocalityEqualTo()(host->locality(), existing_host->second->locality())));
1445
    if (locality_changed) {
14
      hosts_with_updated_locality_for_current_priority.emplace(existing_host->first);
14
    }
1445
    const bool active_health_check_flag_changed =
1445
        (health_checker_ != nullptr && existing_host_found &&
1445
         existing_host->second->disableActiveHealthCheck() != host->disableActiveHealthCheck());
1445
    if (active_health_check_flag_changed) {
4
      hosts_with_active_health_check_flag_changed.emplace(existing_host->first);
4
    }
1445
    const bool skip_inplace_host_update =
1445
        health_check_address_changed || locality_changed || active_health_check_flag_changed;
    // When there is a match and we decided to do in-place update, we potentially update the
    // host's health check flag and metadata. Afterwards, the host is pushed back into the
    // final_hosts, i.e. hosts that should be preserved in the current priority.
1445
    if (existing_host_found && !skip_inplace_host_update) {
492
      existing_hosts_for_current_priority.emplace(existing_host->first);
      // If we find a host matched based on address, we keep it. However we do change weight
      // inline so do that here.
492
      if (host->weight() > max_host_weight) {
6
        max_host_weight = host->weight();
6
      }
492
      if (existing_host->second->weight() != host->weight()) {
54
        existing_host->second->weight(host->weight());
        // We do full host set rebuilds so that load balancers can do pre-computation of data
        // structures based on host weight. This may become a performance problem in certain
        // deployments so it is runtime feature guarded and may also need to be configurable
        // and/or dynamic in the future.
54
        hosts_changed = true;
54
      }
492
      hosts_changed |= updateEdsHealthFlag(*host, *existing_host->second);
      // Did metadata change? Compare cached hashes for O(1) comparison.
492
      const bool metadata_changed = host->metadataHash() != existing_host->second->metadataHash();
492
      if (metadata_changed) {
        // First, update the entire metadata for the endpoint.
1
        existing_host->second->metadata(host->metadata());
        // Also, given that the canary attribute of an endpoint is derived from its metadata
        // (e.g.: from envoy.lb/canary), we do a blind update here since it's cheaper than testing
        // to see if it actually changed. We must update this besides just updating the metadata,
        // because it'll be used by the router filter to compute upstream stats.
1
        existing_host->second->canary(host->canary());
        // If metadata changed, we need to rebuild. See github issue #3810.
1
        hosts_changed = true;
1
      }
      // Did the priority change?
492
      if (host->priority() != existing_host->second->priority()) {
54
        existing_host->second->priority(host->priority());
54
        hosts_added_to_current_priority.emplace_back(existing_host->second);
54
      }
492
      final_hosts.push_back(existing_host->second);
1050
    } else {
953
      new_hosts_for_current_priority.emplace(host_address_string);
953
      if (host->weight() > max_host_weight) {
11
        max_host_weight = host->weight();
11
      }
      // If we are depending on a health checker, we initialize to unhealthy.
      // If there's an existing host with the same health checker, the
      // active health-status is kept.
953
      if (health_checker_ != nullptr && !host->disableActiveHealthCheck()) {
99
        if (existing_host_found && !health_check_address_changed &&
99
            !active_health_check_flag_changed) {
          // If there's an existing host, use the same active health-status.
          // The existing host can be marked PENDING_ACTIVE_HC or
          // ACTIVE_HC_TIMEOUT if it is also marked with FAILED_ACTIVE_HC.
7
          ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) ||
7
                 existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
7
          ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT) ||
7
                 existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
7
          constexpr uint32_t active_hc_statuses_mask =
7
              enumToInt(Host::HealthFlag::FAILED_ACTIVE_HC) |
7
              enumToInt(Host::HealthFlag::DEGRADED_ACTIVE_HC) |
7
              enumToInt(Host::HealthFlag::PENDING_ACTIVE_HC) |
7
              enumToInt(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
7
          const uint32_t existing_host_statuses = existing_host->second->healthFlagsGetAll();
7
          host->healthFlagsSetAll(existing_host_statuses & active_hc_statuses_mask);
92
        } else {
          // No previous known host, mark it as failed active HC.
92
          host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
          // If we want to exclude hosts until they have been health checked, mark them with
          // a flag to indicate that they have not been health checked yet.
92
          if (info_->warmHosts()) {
4
            host->healthFlagSet(Host::HealthFlag::PENDING_ACTIVE_HC);
4
          }
92
        }
99
      }
953
      final_hosts.push_back(host);
953
      hosts_added_to_current_priority.push_back(host);
953
    }
1445
  }
  // Remove hosts from current_priority_hosts that were matched to an existing host in the
  // previous loop.
1111
  auto erase_from =
1111
      std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(),
1245
                     [&existing_hosts_for_current_priority](const HostSharedPtr& p) {
669
                       auto existing_itr =
669
                           existing_hosts_for_current_priority.find(p->address()->asString());
669
                       if (existing_itr != existing_hosts_for_current_priority.end()) {
438
                         existing_hosts_for_current_priority.erase(existing_itr);
438
                         return true;
438
                       }
231
                       return false;
669
                     });
1111
  current_priority_hosts.erase(erase_from, current_priority_hosts.end());
  // If we saw existing hosts during this iteration from a different priority, then we've moved
  // a host from another priority into this one, so we should mark the priority as having changed.
1111
  if (!existing_hosts_for_current_priority.empty()) {
32
    hosts_changed = true;
32
  }
  // The remaining hosts are hosts that are not referenced in the config update. We remove them
  // from the priority if any of the following is true:
  // - Active health checking is not enabled.
  // - The removed hosts are failing active health checking OR have been explicitly marked as
  //   unhealthy by a previous EDS update. We do not count outlier as a reason to remove a host
  //   or any other future health condition that may be added so we do not use the coarseHealth()
  //   API.
  // - We have explicitly configured the cluster to remove hosts regardless of active health
  // status.
1111
  const bool dont_remove_healthy_hosts =
1111
      health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval();
1111
  if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) {
42
    erase_from = std::remove_if(
42
        current_priority_hosts.begin(), current_priority_hosts.end(),
42
        [&all_new_hosts, &new_hosts_for_current_priority,
42
         &hosts_with_updated_locality_for_current_priority,
42
         &hosts_with_active_health_check_flag_changed, &final_hosts,
60
         &max_host_weight](const HostSharedPtr& p) {
60
          const auto address_string = addressToString(p->address());
          // This host has already been added as a new host in the
          // new_hosts_for_current_priority. Return false here to make sure that host
          // reference with older locality gets cleaned up from the priority.
60
          if (hosts_with_updated_locality_for_current_priority.contains(address_string)) {
6
            return false;
6
          }
54
          if (hosts_with_active_health_check_flag_changed.contains(address_string)) {
4
            return false;
4
          }
50
          if (all_new_hosts.contains(address_string) &&
50
              !new_hosts_for_current_priority.contains(address_string)) {
            // If the address is being completely deleted from this priority, but is
            // referenced from another priority, then we assume that the other
            // priority will perform an in-place update to re-use the existing Host.
            // We should therefore not mark it as PENDING_DYNAMIC_REMOVAL, but
            // instead remove it immediately from this priority.
            // Example: health check address changed and priority also changed
28
            return false;
28
          }
          // PENDING_DYNAMIC_REMOVAL doesn't apply for the host with disabled active
          // health check, the host is removed immediately from this priority.
22
          if ((!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
22
                 p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) &&
22
              !p->disableActiveHealthCheck()) {
12
            if (p->weight() > max_host_weight) {
              max_host_weight = p->weight();
            }
12
            final_hosts.push_back(p);
12
            p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
12
            return true;
12
          }
10
          return false;
22
        });
42
    current_priority_hosts.erase(erase_from, current_priority_hosts.end());
42
  }
  // At this point we've accounted for all the new hosts as well the hosts that previously
  // existed in this priority.
1111
  info_->endpointStats().max_host_weight_.set(max_host_weight);
  // Whatever remains in current_priority_hosts should be removed.
1111
  if (!hosts_added_to_current_priority.empty() || !current_priority_hosts.empty()) {
858
    hosts_removed_from_current_priority = std::move(current_priority_hosts);
858
    hosts_changed = true;
858
  }
  // During the update we populated final_hosts with all the hosts that should remain
  // in the current priority, so move them back into current_priority_hosts.
1111
  current_priority_hosts = std::move(final_hosts);
  // We return false here in the absence of EDS health status or metadata changes, because we
  // have no changes to host vector status (modulo weights). When we have EDS
  // health status or metadata changed, we return true, causing updateHosts() to fire in the
  // caller.
1111
  return hosts_changed;
1111
}
Network::DnsLookupFamily
31
getDnsLookupFamilyFromCluster(const envoy::config::cluster::v3::Cluster& cluster) {
31
  return DnsUtils::getDnsLookupFamilyFromEnum(cluster.dns_lookup_family());
31
}
void reportUpstreamCxDestroy(const Upstream::HostDescriptionConstSharedPtr& host,
31171
                             Network::ConnectionEvent event) {
31171
  Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats();
31171
  stats.upstream_cx_destroy_.inc();
31171
  if (event == Network::ConnectionEvent::RemoteClose) {
11054
    stats.upstream_cx_destroy_remote_.inc();
28991
  } else {
20117
    stats.upstream_cx_destroy_local_.inc();
20117
  }
31171
}
void reportUpstreamCxDestroyActiveRequest(const Upstream::HostDescriptionConstSharedPtr& host,
4502
                                          Network::ConnectionEvent event) {
4502
  Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats();
4502
  stats.upstream_cx_destroy_with_active_rq_.inc();
4502
  if (event == Network::ConnectionEvent::RemoteClose) {
1910
    stats.upstream_cx_destroy_remote_with_active_rq_.inc();
4162
  } else {
2592
    stats.upstream_cx_destroy_local_with_active_rq_.inc();
2592
  }
4502
}
Network::Address::InstanceConstSharedPtr resolveHealthCheckAddress(
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
48924
    Network::Address::InstanceConstSharedPtr host_address) {
48924
  Network::Address::InstanceConstSharedPtr health_check_address;
48924
  const auto& port_value = health_check_config.port_value();
48924
  if (health_check_config.has_address()) {
971
    auto address_or_error = Network::Address::resolveProtoAddress(health_check_config.address());
971
    THROW_IF_NOT_OK_REF(address_or_error.status());
970
    auto address = address_or_error.value();
970
    health_check_address =
970
        port_value == 0 ? address : Network::Utility::getAddressWithPort(*address, port_value);
48305
  } else {
47953
    health_check_address = (port_value == 0 || !host_address->ip())
47953
                               ? host_address
47953
                               : Network::Utility::getAddressWithPort(*host_address, port_value);
47953
  }
48923
  return health_check_address;
48924
}
} // namespace Upstream
} // namespace Envoy