1
#include "source/common/upstream/upstream_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <limits>
6
#include <list>
7
#include <memory>
8
#include <string>
9
#include <vector>
10

            
11
#include "envoy/common/optref.h"
12
#include "envoy/config/cluster/v3/circuit_breaker.pb.h"
13
#include "envoy/config/cluster/v3/cluster.pb.h"
14
#include "envoy/config/core/v3/address.pb.h"
15
#include "envoy/config/core/v3/base.pb.h"
16
#include "envoy/config/core/v3/health_check.pb.h"
17
#include "envoy/config/core/v3/protocol.pb.h"
18
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
19
#include "envoy/config/upstream/local_address_selector/v3/default_local_address_selector.pb.h"
20
#include "envoy/event/dispatcher.h"
21
#include "envoy/event/timer.h"
22
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h"
23
#include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.pb.h"
24
#include "envoy/extensions/transport_sockets/raw_buffer/v3/raw_buffer.pb.h"
25
#include "envoy/init/manager.h"
26
#include "envoy/network/dns.h"
27
#include "envoy/network/transport_socket.h"
28
#include "envoy/registry/registry.h"
29
#include "envoy/secret/secret_manager.h"
30
#include "envoy/server/filter_config.h"
31
#include "envoy/server/transport_socket_config.h"
32
#include "envoy/ssl/context_manager.h"
33
#include "envoy/stats/scope.h"
34
#include "envoy/upstream/health_checker.h"
35
#include "envoy/upstream/upstream.h"
36

            
37
#include "source/common/common/dns_utils.h"
38
#include "source/common/common/enum_to_int.h"
39
#include "source/common/common/fmt.h"
40
#include "source/common/common/utility.h"
41
#include "source/common/config/utility.h"
42
#include "source/common/http/http1/codec_stats.h"
43
#include "source/common/http/http2/codec_stats.h"
44
#include "source/common/http/utility.h"
45
#include "source/common/network/address_impl.h"
46
#include "source/common/network/filter_state_proxy_info.h"
47
#include "source/common/network/happy_eyeballs_connection_impl.h"
48
#include "source/common/network/resolver_impl.h"
49
#include "source/common/network/socket_option_factory.h"
50
#include "source/common/network/socket_option_impl.h"
51
#include "source/common/protobuf/protobuf.h"
52
#include "source/common/protobuf/utility.h"
53
#include "source/common/router/config_impl.h"
54
#include "source/common/router/config_utility.h"
55
#include "source/common/runtime/runtime_features.h"
56
#include "source/common/runtime/runtime_impl.h"
57
#include "source/common/stats/deferred_creation.h"
58
#include "source/common/upstream/cluster_factory_impl.h"
59
#include "source/common/upstream/health_checker_impl.h"
60
#include "source/common/upstream/locality_pool.h"
61
#include "source/server/transport_socket_config_impl.h"
62

            
63
#include "absl/container/node_hash_set.h"
64
#include "absl/strings/str_cat.h"
65

            
66
namespace Envoy {
67
namespace Upstream {
68
namespace {
69
const envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig&
70
38
defaultHappyEyeballsConfig() {
71
38
  CONSTRUCT_ON_FIRST_USE(
72
38
      envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig, []() {
73
38
        envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig default_config;
74
38
        default_config.set_first_address_family_version(
75
38
            envoy::config::cluster::v3::UpstreamConnectionOptions::DEFAULT);
76
38
        default_config.mutable_first_address_family_count()->set_value(1);
77
38
        return default_config;
78
38
      }());
79
38
}
80

            
81
20004
std::string addressToString(Network::Address::InstanceConstSharedPtr address) {
82
20004
  if (!address) {
83
1
    return "";
84
1
  }
85
20003
  return address->asString();
86
20004
}
87

            
88
absl::StatusOr<ProtocolOptionsConfigConstSharedPtr>
89
createProtocolOptionsConfig(const std::string& name, const Protobuf::Any& typed_config,
90
13546
                            Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
91
13546
  Server::Configuration::ProtocolOptionsFactory* factory =
92
13546
      Registry::FactoryRegistry<Server::Configuration::NamedNetworkFilterConfigFactory>::getFactory(
93
13546
          name);
94
13546
  if (factory == nullptr) {
95
13455
    factory =
96
13455
        Registry::FactoryRegistry<Server::Configuration::NamedHttpFilterConfigFactory>::getFactory(
97
13455
            name);
98
13455
  }
99
13546
  if (factory == nullptr) {
100
13451
    factory =
101
13451
        Registry::FactoryRegistry<Server::Configuration::ProtocolOptionsFactory>::getFactory(name);
102
13451
  }
103

            
104
13546
  if (factory == nullptr) {
105
4
    return absl::InvalidArgumentError(
106
4
        fmt::format("Didn't find a registered network or http filter or protocol "
107
4
                    "options implementation for name: '{}'",
108
4
                    name));
109
4
  }
110

            
111
13542
  ProtobufTypes::MessagePtr proto_config = factory->createEmptyProtocolOptionsProto();
112

            
113
13542
  if (proto_config == nullptr) {
114
4
    return absl::InvalidArgumentError(
115
4
        fmt::format("filter {} does not support protocol options", name));
116
4
  }
117

            
118
13538
  RETURN_IF_NOT_OK(Envoy::Config::Utility::translateOpaqueConfig(
119
13538
      typed_config, factory_context.messageValidationVisitor(), *proto_config));
120
13538
  return factory->createProtocolOptionsConfig(*proto_config, factory_context);
121
13538
}
122

            
123
absl::StatusOr<absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr>>
124
parseExtensionProtocolOptions(
125
    const envoy::config::cluster::v3::Cluster& config,
126
18149
    Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
127
18149
  absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> options;
128

            
129
18149
  for (const auto& it : config.typed_extension_protocol_options()) {
130
13546
    auto& name = it.first;
131
13546
    auto object_or_error = createProtocolOptionsConfig(name, it.second, factory_context);
132
13546
    RETURN_IF_NOT_OK_REF(object_or_error.status());
133
13538
    if (object_or_error.value() != nullptr) {
134
13532
      options[name] = std::move(object_or_error.value());
135
13532
    }
136
13538
  }
137

            
138
18141
  return options;
139
18149
}
140

            
141
// Updates the EDS health flags for an existing host to match the new host.
142
// @param updated_host the new host to read health flag values from.
143
// @param existing_host the host to update.
144
// @return bool whether the flag update caused the host health to change.
145
492
bool updateEdsHealthFlag(const Host& updated_host, Host& existing_host) {
146
492
  auto updated_eds_health_status = updated_host.edsHealthStatus();
147

            
148
  // Check if the health flag has changed.
149
492
  if (updated_eds_health_status == existing_host.edsHealthStatus()) {
150
460
    return false;
151
460
  }
152

            
153
32
  const auto previous_health = existing_host.coarseHealth();
154
32
  existing_host.setEdsHealthStatus(updated_eds_health_status);
155

            
156
  // Rebuild if changing the flag affected the host health.
157
32
  return previous_health != existing_host.coarseHealth();
158
492
}
159

            
160
// Converts a set of hosts into a HostVector, excluding certain hosts.
161
// @param hosts hosts to convert
162
// @param excluded_hosts hosts to exclude from the resulting vector.
163
HostVector filterHosts(const absl::node_hash_set<HostSharedPtr>& hosts,
164
1586
                       const absl::node_hash_set<HostSharedPtr>& excluded_hosts) {
165
1586
  HostVector net_hosts;
166
1586
  net_hosts.reserve(hosts.size());
167

            
168
1781
  for (const auto& host : hosts) {
169
1162
    if (excluded_hosts.find(host) == excluded_hosts.end()) {
170
1052
      net_hosts.emplace_back(host);
171
1052
    }
172
1162
  }
173

            
174
1586
  return net_hosts;
175
1586
}
176

            
177
Stats::ScopeSharedPtr generateStatsScope(const envoy::config::cluster::v3::Cluster& config,
178
18104
                                         Stats::Store& stats) {
179
18104
  return stats.createScope(fmt::format(
180
18104
      "cluster.{}.", config.alt_stat_name().empty() ? config.name() : config.alt_stat_name()));
181
18104
}
182

            
183
Network::ConnectionSocket::OptionsSharedPtr
184
buildBaseSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config,
185
18130
                       const envoy::config::core::v3::BindConfig& bootstrap_bind_config) {
186
18130
  Network::ConnectionSocket::OptionsSharedPtr base_options =
187
18130
      std::make_shared<Network::ConnectionSocket::Options>();
188

            
189
  // The process-wide `signal()` handling may fail to handle SIGPIPE if overridden
190
  // in the process (i.e., on a mobile client). Some OSes support handling it at the socket layer:
191
18130
  if (ENVOY_SOCKET_SO_NOSIGPIPE.hasValue()) {
192
    Network::Socket::appendOptions(base_options,
193
                                   Network::SocketOptionFactory::buildSocketNoSigpipeOptions());
194
  }
195
  // Cluster IP_FREEBIND settings, when set, will override the cluster manager wide settings.
196
18130
  if ((bootstrap_bind_config.freebind().value() &&
197
18130
       !cluster_config.upstream_bind_config().has_freebind()) ||
198
18130
      cluster_config.upstream_bind_config().freebind().value()) {
199
3
    Network::Socket::appendOptions(base_options,
200
3
                                   Network::SocketOptionFactory::buildIpFreebindOptions());
201
3
  }
202
18130
  if (cluster_config.upstream_connection_options().has_tcp_keepalive()) {
203
9
    Network::Socket::appendOptions(
204
9
        base_options,
205
9
        Network::SocketOptionFactory::buildTcpKeepaliveOptions(Network::parseTcpKeepaliveConfig(
206
9
            cluster_config.upstream_connection_options().tcp_keepalive())));
207
9
  }
208

            
209
18130
  return base_options;
210
18130
}
211

            
212
Network::ConnectionSocket::OptionsSharedPtr
213
buildClusterSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config,
214
18130
                          const envoy::config::core::v3::BindConfig& bootstrap_bind_config) {
215
18130
  Network::ConnectionSocket::OptionsSharedPtr cluster_options =
216
18130
      std::make_shared<Network::ConnectionSocket::Options>();
217
  // Cluster socket_options trump cluster manager wide.
218
18130
  if (bootstrap_bind_config.socket_options().size() +
219
18130
          cluster_config.upstream_bind_config().socket_options().size() >
220
18130
      0) {
221
9
    auto socket_options = !cluster_config.upstream_bind_config().socket_options().empty()
222
9
                              ? cluster_config.upstream_bind_config().socket_options()
223
9
                              : bootstrap_bind_config.socket_options();
224
9
    Network::Socket::appendOptions(
225
9
        cluster_options, Network::SocketOptionFactory::buildLiteralOptions(socket_options));
226
9
  }
227
18130
  return cluster_options;
228
18130
}
229

            
230
absl::StatusOr<std::vector<::Envoy::Upstream::UpstreamLocalAddress>>
231
parseBindConfig(::Envoy::OptRef<const envoy::config::core::v3::BindConfig> bind_config,
232
                const absl::optional<std::string>& cluster_name,
233
                Network::ConnectionSocket::OptionsSharedPtr base_socket_options,
234
18130
                Network::ConnectionSocket::OptionsSharedPtr cluster_socket_options) {
235

            
236
18130
  std::vector<::Envoy::Upstream::UpstreamLocalAddress> upstream_local_addresses;
237
18130
  if (bind_config.has_value()) {
238
85
    UpstreamLocalAddress upstream_local_address;
239
85
    upstream_local_address.address_ = nullptr;
240
85
    if (bind_config->has_source_address()) {
241

            
242
31
      auto address_or_error =
243
31
          ::Envoy::Network::Address::resolveProtoSocketAddress(bind_config->source_address());
244
31
      RETURN_IF_NOT_OK_REF(address_or_error.status());
245
31
      upstream_local_address.address_ = address_or_error.value();
246
31
    }
247
85
    upstream_local_address.socket_options_ = std::make_shared<Network::ConnectionSocket::Options>();
248

            
249
85
    ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_,
250
85
                                            base_socket_options);
251
85
    ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_,
252
85
                                            cluster_socket_options);
253

            
254
85
    upstream_local_addresses.push_back(upstream_local_address);
255

            
256
85
    for (const auto& extra_source_address : bind_config->extra_source_addresses()) {
257
17
      UpstreamLocalAddress extra_upstream_local_address;
258
17
      auto address_or_error =
259
17
          ::Envoy::Network::Address::resolveProtoSocketAddress(extra_source_address.address());
260
17
      RETURN_IF_NOT_OK_REF(address_or_error.status());
261
17
      extra_upstream_local_address.address_ = address_or_error.value();
262

            
263
17
      extra_upstream_local_address.socket_options_ =
264
17
          std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
265
17
      ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_,
266
17
                                              base_socket_options);
267

            
268
17
      if (extra_source_address.has_socket_options()) {
269
2
        ::Envoy::Network::Socket::appendOptions(
270
2
            extra_upstream_local_address.socket_options_,
271
2
            ::Envoy::Network::SocketOptionFactory::buildLiteralOptions(
272
2
                extra_source_address.socket_options().socket_options()));
273
15
      } else {
274
15
        ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_,
275
15
                                                cluster_socket_options);
276
15
      }
277
17
      upstream_local_addresses.push_back(extra_upstream_local_address);
278
17
    }
279

            
280
85
    for (const auto& additional_source_address : bind_config->additional_source_addresses()) {
281
7
      UpstreamLocalAddress additional_upstream_local_address;
282
7
      auto address_or_error =
283
7
          ::Envoy::Network::Address::resolveProtoSocketAddress(additional_source_address);
284
7
      RETURN_IF_NOT_OK_REF(address_or_error.status());
285
7
      additional_upstream_local_address.address_ = address_or_error.value();
286
7
      additional_upstream_local_address.socket_options_ =
287
7
          std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
288
7
      ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_,
289
7
                                              base_socket_options);
290
7
      ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_,
291
7
                                              cluster_socket_options);
292
7
      upstream_local_addresses.push_back(additional_upstream_local_address);
293
7
    }
294
18049
  } else {
295
    // If there is no bind config specified, then return a nullptr for the address.
296
18045
    UpstreamLocalAddress local_address;
297
18045
    local_address.address_ = nullptr;
298
18045
    local_address.socket_options_ = std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
299
18045
    ::Envoy::Network::Socket::appendOptions(local_address.socket_options_, base_socket_options);
300
18045
    Network::Socket::appendOptions(local_address.socket_options_, cluster_socket_options);
301
18045
    upstream_local_addresses.push_back(local_address);
302
18045
  }
303

            
304
  // Verify that we have valid addresses if size is greater than 1.
305
18130
  if (upstream_local_addresses.size() > 1) {
306
42
    for (auto const& upstream_local_address : upstream_local_addresses) {
307
42
      if (upstream_local_address.address_ == nullptr) {
308
        return absl::InvalidArgumentError(fmt::format(
309
            "{}'s upstream binding config has invalid IP addresses.",
310
            !(cluster_name.has_value()) ? "Bootstrap"
311
                                        : fmt::format("Cluster {}", cluster_name.value())));
312
      }
313
42
    }
314
18
  }
315

            
316
18130
  return upstream_local_addresses;
317
18130
}
318

            
319
absl::StatusOr<Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr>
320
createUpstreamLocalAddressSelector(
321
    const envoy::config::cluster::v3::Cluster& cluster_config,
322
18133
    const absl::optional<envoy::config::core::v3::BindConfig>& bootstrap_bind_config) {
323

            
324
  // Use the cluster bind config if specified. This completely overrides the
325
  // bootstrap bind config when present.
326
18133
  OptRef<const envoy::config::core::v3::BindConfig> bind_config;
327
18133
  absl::optional<std::string> cluster_name;
328
18133
  if (cluster_config.has_upstream_bind_config()) {
329
21
    bind_config.emplace(cluster_config.upstream_bind_config());
330
21
    cluster_name.emplace(cluster_config.name());
331
18112
  } else if (bootstrap_bind_config.has_value()) {
332
67
    bind_config.emplace(*bootstrap_bind_config);
333
67
  }
334

            
335
  // Verify that bind config is valid.
336
18133
  if (bind_config.has_value()) {
337
88
    if (bind_config->additional_source_addresses_size() > 0 &&
338
88
        bind_config->extra_source_addresses_size() > 0) {
339
1
      return absl::InvalidArgumentError(fmt::format(
340
1
          "Can't specify both `extra_source_addresses` and `additional_source_addresses` "
341
1
          "in the {}'s upstream binding config",
342
1
          !(cluster_name.has_value()) ? "Bootstrap"
343
1
                                      : fmt::format("Cluster {}", cluster_name.value())));
344
1
    }
345

            
346
87
    if (!bind_config->has_source_address() &&
347
87
        (bind_config->extra_source_addresses_size() > 0 ||
348
56
         bind_config->additional_source_addresses_size() > 0)) {
349
2
      return absl::InvalidArgumentError(fmt::format(
350
2
          "{}'s upstream binding config has extra/additional source addresses but no "
351
2
          "source_address. Extra/additional addresses cannot be specified if "
352
2
          "source_address is not set.",
353
2
          !(cluster_name.has_value()) ? "Bootstrap"
354
2
                                      : fmt::format("Cluster {}", cluster_name.value())));
355
2
    }
356

            
357
#if !defined(__linux__)
358
    auto fail_status = absl::InvalidArgumentError(fmt::format(
359
        "{}'s upstream binding config contains addresses with network namespace filepaths, but the "
360
        "OS is not Linux. Network namespaces can only be used on Linux.",
361
        !(cluster_name.has_value()) ? "Bootstrap"
362
                                    : fmt::format("Cluster {}", cluster_name.value())));
363
    if (bind_config->has_source_address() &&
364
        !bind_config->source_address().network_namespace_filepath().empty()) {
365
      return fail_status;
366
    };
367
    for (const auto& addr : bind_config->extra_source_addresses()) {
368
      if (addr.has_address() && !addr.address().network_namespace_filepath().empty()) {
369
        return fail_status;
370
      }
371
    }
372
    for (const auto& addr : bind_config->additional_source_addresses()) {
373
      if (!addr.network_namespace_filepath().empty()) {
374
        return fail_status;
375
      }
376
    }
377
#endif
378
87
  }
379
18130
  UpstreamLocalAddressSelectorFactory* local_address_selector_factory;
380
18130
  const envoy::config::core::v3::TypedExtensionConfig* const local_address_selector_config =
381
18130
      bind_config.has_value() && bind_config->has_local_address_selector()
382
18130
          ? &bind_config->local_address_selector()
383
18130
          : nullptr;
384
18130
  if (local_address_selector_config) {
385
4
    local_address_selector_factory =
386
4
        Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(
387
4
            *local_address_selector_config, false);
388
18128
  } else {
389
    // Create the default local address selector if one was not specified.
390
18126
    envoy::config::upstream::local_address_selector::v3::DefaultLocalAddressSelector default_config;
391
18126
    envoy::config::core::v3::TypedExtensionConfig typed_extension;
392
18126
    typed_extension.mutable_typed_config()->PackFrom(default_config);
393
18126
    local_address_selector_factory =
394
18126
        Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(typed_extension,
395
18126
                                                                                 false);
396
18126
  }
397
18130
  absl::StatusOr<std::vector<::Envoy::Upstream::UpstreamLocalAddress>> config_or_error =
398
18130
      parseBindConfig(
399
18130
          bind_config, cluster_name,
400
18130
          buildBaseSocketOptions(cluster_config, bootstrap_bind_config.value_or(
401
18130
                                                     envoy::config::core::v3::BindConfig{})),
402
18130
          buildClusterSocketOptions(cluster_config, bootstrap_bind_config.value_or(
403
18130
                                                        envoy::config::core::v3::BindConfig{})));
404
18130
  RETURN_IF_NOT_OK_REF(config_or_error.status());
405
18130
  auto selector_or_error = local_address_selector_factory->createLocalAddressSelector(
406
18130
      config_or_error.value(), cluster_name);
407
18130
  RETURN_IF_NOT_OK_REF(selector_or_error.status());
408
18124
  return selector_or_error.value();
409
18130
}
410

            
411
} // namespace
412

            
413
// Allow disabling ALPN checks for transport sockets. See
414
// https://github.com/envoyproxy/envoy/issues/22876
415
const absl::string_view ClusterImplBase::DoNotValidateAlpnRuntimeKey =
416
    "config.do_not_validate_alpn_support";
417

            
418
// Overriding drop_overload ratio settings from EDS.
419
const absl::string_view ClusterImplBase::DropOverloadRuntimeKey =
420
    "load_balancing_policy.drop_overload_limit";
421

            
422
// TODO(pianiststickman): this implementation takes a lock on the hot path and puts a copy of the
423
// stat name into every host that receives a copy of that metric. This can be improved by putting
424
// a single copy of the stat name into a thread-local key->index map so that the lock can be avoided
425
// and using the index as the key to the stat map instead.
426
103
void LoadMetricStatsImpl::add(const absl::string_view key, double value) {
427
103
  absl::MutexLock lock(mu_);
428
103
  if (map_ == nullptr) {
429
30
    map_ = std::make_unique<StatMap>();
430
30
  }
431
103
  Stat& stat = (*map_)[key];
432
103
  ++stat.num_requests_with_metric;
433
103
  stat.total_metric_value += value;
434
103
}
435

            
436
90
LoadMetricStats::StatMapPtr LoadMetricStatsImpl::latch() {
437
90
  absl::MutexLock lock(mu_);
438
90
  StatMapPtr latched = std::move(map_);
439
90
  map_ = nullptr;
440
90
  return latched;
441
90
}
442

            
443
absl::StatusOr<std::unique_ptr<HostDescriptionImpl>> HostDescriptionImpl::create(
444
    ClusterInfoConstSharedPtr cluster, const std::string& hostname,
445
    Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr endpoint_metadata,
446
    MetadataConstSharedPtr locality_metadata,
447
    std::shared_ptr<const envoy::config::core::v3::Locality> locality,
448
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
449
22009
    uint32_t priority, const AddressVector& address_list) {
450
22009
  absl::Status creation_status = absl::OkStatus();
451
22009
  auto ret = std::unique_ptr<HostDescriptionImpl>(new HostDescriptionImpl(
452
22009
      creation_status, cluster, hostname, dest_address, endpoint_metadata, locality_metadata,
453
22009
      locality, health_check_config, priority, address_list));
454
22009
  RETURN_IF_NOT_OK(creation_status);
455
22007
  return ret;
456
22009
}
457

            
458
HostDescriptionImpl::HostDescriptionImpl(
459
    absl::Status& creation_status, ClusterInfoConstSharedPtr cluster, const std::string& hostname,
460
    Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr endpoint_metadata,
461
    MetadataConstSharedPtr locality_metadata,
462
    std::shared_ptr<const envoy::config::core::v3::Locality> locality,
463
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
464
    uint32_t priority, const AddressVector& address_list)
465
48494
    : HostDescriptionImplBase(cluster, hostname, dest_address, endpoint_metadata, locality_metadata,
466
48494
                              locality, health_check_config, priority, creation_status),
467
48494
      address_(dest_address),
468
48494
      address_list_or_null_(makeAddressListOrNull(dest_address, address_list)),
469
48494
      health_check_address_(resolveHealthCheckAddress(health_check_config, dest_address)) {}
470

            
471
HostDescriptionImplBase::HostDescriptionImplBase(
472
    ClusterInfoConstSharedPtr cluster, const std::string& hostname,
473
    Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr endpoint_metadata,
474
    MetadataConstSharedPtr locality_metadata,
475
    std::shared_ptr<const envoy::config::core::v3::Locality> locality,
476
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
477
    uint32_t priority, absl::Status& creation_status)
478
48673
    : cluster_(cluster), hostname_(hostname),
479
48673
      health_checks_hostname_(health_check_config.hostname()),
480
48673
      canary_(Config::Metadata::metadataValue(endpoint_metadata.get(),
481
48673
                                              Config::MetadataFilters::get().ENVOY_LB,
482
48673
                                              Config::MetadataEnvoyLbKeys::get().CANARY)
483
48673
                  .bool_value()),
484
48673
      endpoint_metadata_(endpoint_metadata),
485
48673
      endpoint_metadata_hash_(endpoint_metadata ? MessageUtil::hash(*endpoint_metadata) : 0),
486
48673
      locality_metadata_(locality_metadata), locality_(std::move(locality)),
487
48673
      locality_zone_stat_name_(locality_->zone(), cluster->statsScope().symbolTable()),
488
48673
      priority_(priority),
489
48673
      socket_factory_(resolveTransportSocketFactory(dest_address, endpoint_metadata_.get())) {
490
48673
  if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) {
491
    // Setting the health check port to non-0 only works for IP-type addresses. Setting the port
492
    // for a pipe address is a misconfiguration.
493
1
    creation_status = absl::InvalidArgumentError(
494
1
        fmt::format("Invalid host configuration: non-zero port for non-IP address"));
495
48672
  } else if (dest_address && dest_address->networkNamespace().has_value()) {
496
1
    creation_status = absl::InvalidArgumentError(
497
1
        "Invalid host configuration: hosts cannot specify network namespaces with their address");
498
1
  }
499
48673
}
500

            
501
HostDescription::SharedConstAddressVector HostDescriptionImplBase::makeAddressListOrNull(
502
48673
    const Network::Address::InstanceConstSharedPtr& address, const AddressVector& address_list) {
503
48673
  if (!address || address_list.empty()) {
504
48445
    return {};
505
48445
  }
506
228
  ASSERT(*address_list.front() == *address);
507
228
  return std::make_shared<AddressVector>(address_list);
508
48673
}
509

            
510
Network::UpstreamTransportSocketFactory& HostDescriptionImplBase::resolveTransportSocketFactory(
511
    const Network::Address::InstanceConstSharedPtr& dest_address,
512
    const envoy::config::core::v3::Metadata* endpoint_metadata,
513
48736
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const {
514
48736
  auto match = cluster_->transportSocketMatcher().resolve(
515
48736
      endpoint_metadata, locality_metadata_.get(), transport_socket_options);
516
48736
  match.stats_.total_match_count_.inc();
517
48736
  ENVOY_LOG(debug, "transport socket match, socket {} selected for host with address {}",
518
48736
            match.name_, dest_address ? dest_address->asString() : "empty");
519

            
520
48736
  return match.factory_;
521
48736
}
522

            
523
Host::CreateConnectionData HostImplBase::createConnection(
524
    Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
525
30184
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const {
526
30184
  const bool needs_per_connection_resolution =
527
30184
      cluster().transportSocketMatcher().usesFilterState() && transport_socket_options &&
528
30184
      !transport_socket_options->downstreamSharedFilterStateObjects().empty();
529

            
530
30184
  Network::UpstreamTransportSocketFactory& factory =
531
30184
      needs_per_connection_resolution
532
30184
          ? resolveTransportSocketFactory(address(), metadata().get(), transport_socket_options)
533
30184
          : transportSocketFactory();
534

            
535
30184
  return createConnection(dispatcher, cluster(), address(), addressListOrNull(), factory, options,
536
30184
                          transport_socket_options, shared_from_this());
537
30184
}
538

            
539
26699
void HostImplBase::setEdsHealthFlag(envoy::config::core::v3::HealthStatus health_status) {
540
  // Clear all old EDS health flags first.
541
26699
  HostImplBase::healthFlagClear(Host::HealthFlag::FAILED_EDS_HEALTH);
542
26699
  HostImplBase::healthFlagClear(Host::HealthFlag::DEGRADED_EDS_HEALTH);
543
26699
  HostImplBase::healthFlagClear(Host::HealthFlag::EDS_STATUS_DRAINING);
544

            
545
  // Set the appropriate EDS health flag.
546
26699
  switch (health_status) {
547
78
  case envoy::config::core::v3::UNHEALTHY:
548
78
    FALLTHRU;
549
90
  case envoy::config::core::v3::TIMEOUT:
550
90
    HostImplBase::healthFlagSet(Host::HealthFlag::FAILED_EDS_HEALTH);
551
90
    break;
552
9
  case envoy::config::core::v3::DRAINING:
553
9
    HostImplBase::healthFlagSet(Host::HealthFlag::EDS_STATUS_DRAINING);
554
9
    break;
555
42
  case envoy::config::core::v3::DEGRADED:
556
42
    HostImplBase::healthFlagSet(Host::HealthFlag::DEGRADED_EDS_HEALTH);
557
42
    break;
558
26558
  default:
559
26558
    break;
560
    // No health flags should be set.
561
26699
  }
562
26699
}
563

            
564
Host::CreateConnectionData HostImplBase::createHealthCheckConnection(
565
    Event::Dispatcher& dispatcher,
566
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
567
401
    const envoy::config::core::v3::Metadata* metadata) const {
568
401
  Network::UpstreamTransportSocketFactory& factory =
569
401
      (metadata != nullptr)
570
401
          ? resolveTransportSocketFactory(healthCheckAddress(), metadata, transport_socket_options)
571
401
          : transportSocketFactory();
572
401
  return createConnection(dispatcher, cluster(), healthCheckAddress(), {}, factory, nullptr,
573
401
                          transport_socket_options, shared_from_this());
574
401
}
575

            
576
absl::optional<Network::Address::InstanceConstSharedPtr> HostImplBase::maybeGetProxyRedirectAddress(
577
    const Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
578
    HostDescriptionConstSharedPtr host,
579
30694
    const Network::UpstreamTransportSocketFactory& socket_factory) {
580
30694
  if (transport_socket_options && transport_socket_options->http11ProxyInfo().has_value()) {
581
12
    return transport_socket_options->http11ProxyInfo()->proxy_address;
582
12
  }
583

            
584
  // See if host metadata contains a proxy address and only check locality metadata if host
585
  // metadata did not have the relevant key.
586
61364
  for (const auto& metadata : {host->metadata(), host->localityMetadata()}) {
587
61364
    if (metadata == nullptr) {
588
60993
      continue;
589
60993
    }
590

            
591
371
    auto addr_it = metadata->typed_filter_metadata().find(
592
371
        Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR);
593
371
    if (addr_it == metadata->typed_filter_metadata().end()) {
594
371
      continue;
595
371
    }
596

            
597
    // Parse an address from the metadata.
598
    envoy::config::core::v3::Address proxy_addr;
599
    auto status = MessageUtil::unpackTo(addr_it->second, proxy_addr);
600
    if (!status.ok()) {
601
      ENVOY_LOG_EVERY_POW_2(
602
          error, "failed to parse proto from endpoint/locality metadata field {}, host={}",
603
          Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR,
604
          host->hostname());
605
      return absl::nullopt;
606
    }
607

            
608
    // Resolve the parsed address proto.
609
    auto resolve_status = Network::Address::resolveProtoAddress(proxy_addr);
610
    if (!resolve_status.ok()) {
611
      ENVOY_LOG_EVERY_POW_2(
612
          error, "failed to resolve address from endpoint/locality metadata field {}, host={}",
613
          Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR,
614
          host->hostname());
615
      return absl::nullopt;
616
    }
617

            
618
    // We successfully resolved, so return the instance ptr.
619
    return resolve_status.value();
620
  }
621

            
622
  // Proxy address was not found in the metadata. If a default proxy address is set, return that.
623
30682
  if (socket_factory.defaultHttp11ProxyInfo().has_value()) {
624
1
    return socket_factory.defaultHttp11ProxyInfo()->proxy_address;
625
1
  }
626

            
627
30681
  return absl::nullopt;
628
30682
}
629

            
630
Host::CreateConnectionData HostImplBase::createConnection(
631
    Event::Dispatcher& dispatcher, const ClusterInfo& cluster,
632
    const Network::Address::InstanceConstSharedPtr& address,
633
    const SharedConstAddressVector& address_list_or_null,
634
    Network::UpstreamTransportSocketFactory& socket_factory,
635
    const Network::ConnectionSocket::OptionsSharedPtr& options,
636
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
637
30694
    HostDescriptionConstSharedPtr host) {
638
30694
  auto source_address_selector = cluster.getUpstreamLocalAddressSelector();
639

            
640
30694
  absl::optional<Network::Address::InstanceConstSharedPtr> proxy_address =
641
30694
      maybeGetProxyRedirectAddress(transport_socket_options, host, socket_factory);
642

            
643
30694
  Network::ClientConnectionPtr connection;
644
  // If the transport socket options or endpoint/locality metadata indicate the connection should
645
  // be redirected to a proxy, create the TCP connection to the proxy's address not the host's
646
  // address.
647
30694
  if (proxy_address.has_value()) {
648
13
    auto upstream_local_address = source_address_selector->getUpstreamLocalAddress(
649
13
        address, options, makeOptRefFromPtr(transport_socket_options.get()));
650
13
    ENVOY_LOG(debug, "Connecting to configured HTTP/1.1 proxy at {}",
651
13
              proxy_address.value()->asString());
652
13
    connection = dispatcher.createClientConnection(
653
13
        proxy_address.value(), upstream_local_address.address_,
654
13
        socket_factory.createTransportSocket(transport_socket_options, host),
655
13
        upstream_local_address.socket_options_, transport_socket_options);
656
30688
  } else if (address_list_or_null != nullptr && address_list_or_null->size() > 1) {
657
39
    ENVOY_LOG(debug, "Upstream using happy eyeballs config.");
658
39
    const envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig&
659
39
        happy_eyeballs_config =
660
39
            cluster.happyEyeballsConfig().has_value() ? *cluster.happyEyeballsConfig()
661
39
                                                      : defaultHappyEyeballsConfig();
662
39
    connection = std::make_unique<Network::HappyEyeballsConnectionImpl>(
663
39
        dispatcher, *address_list_or_null, source_address_selector, socket_factory,
664
39
        transport_socket_options, host, options, happy_eyeballs_config);
665
30670
  } else {
666
30642
    auto upstream_local_address = source_address_selector->getUpstreamLocalAddress(
667
30642
        address, options, makeOptRefFromPtr(transport_socket_options.get()));
668
30642
    connection = dispatcher.createClientConnection(
669
30642
        address, upstream_local_address.address_,
670
30642
        socket_factory.createTransportSocket(transport_socket_options, host),
671
30642
        upstream_local_address.socket_options_, transport_socket_options);
672
30642
  }
673

            
674
30694
  connection->connectionInfoSetter().enableSettingInterfaceName(
675
30694
      cluster.setLocalInterfaceNameOnUpstreamConnections());
676
30694
  connection->setBufferLimits(cluster.perConnectionBufferLimitBytes());
677
30694
  const auto timeout = cluster.perConnectionBufferHighWatermarkTimeout();
678
30694
  if (timeout.count() > 0) {
679
1
    connection->setBufferHighWatermarkTimeout(timeout);
680
1
  }
681
30694
  if (auto upstream_info = connection->streamInfo().upstreamInfo(); upstream_info) {
682
30685
    upstream_info->setUpstreamHost(host);
683
30685
  }
684
30694
  cluster.createNetworkFilterChain(*connection);
685
30694
  return {std::move(connection), std::move(host)};
686
30694
}
687

            
688
26880
void HostImplBase::weight(uint32_t new_weight) { weight_ = std::max(1U, new_weight); }
689

            
690
absl::StatusOr<std::unique_ptr<HostImpl>> HostImpl::create(
691
    ClusterInfoConstSharedPtr cluster, const std::string& hostname,
692
    Network::Address::InstanceConstSharedPtr address, MetadataConstSharedPtr endpoint_metadata,
693
    MetadataConstSharedPtr locality_metadata, uint32_t initial_weight,
694
    std::shared_ptr<const envoy::config::core::v3::Locality> locality,
695
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
696
    uint32_t priority, const envoy::config::core::v3::HealthStatus health_status,
697
26373
    const AddressVector& address_list) {
698
26373
  absl::Status creation_status = absl::OkStatus();
699
26373
  auto ret = std::unique_ptr<HostImpl>(new HostImpl(
700
26373
      creation_status, cluster, hostname, address, endpoint_metadata, locality_metadata,
701
26373
      initial_weight, locality, health_check_config, priority, health_status, address_list));
702
26373
  RETURN_IF_NOT_OK(creation_status);
703
26373
  return ret;
704
26373
}
705

            
706
std::vector<HostsPerLocalityConstSharedPtr> HostsPerLocalityImpl::filter(
707
20658
    const std::vector<std::function<bool(const Host&)>>& predicates) const {
708
  // We keep two lists: one for being able to mutate the clone and one for returning to the
709
  // caller. Creating them both at the start avoids iterating over the mutable values at the end
710
  // to convert them to a const pointer.
711
20658
  std::vector<std::shared_ptr<HostsPerLocalityImpl>> mutable_clones;
712
20658
  std::vector<HostsPerLocalityConstSharedPtr> filtered_clones;
713

            
714
20658
  mutable_clones.reserve(predicates.size());
715
20658
  filtered_clones.reserve(predicates.size());
716
77350
  for (size_t i = 0; i < predicates.size(); ++i) {
717
56692
    mutable_clones.emplace_back(std::make_shared<HostsPerLocalityImpl>());
718
56692
    filtered_clones.emplace_back(mutable_clones.back());
719
56692
    mutable_clones.back()->local_ = local_;
720
56692
  }
721

            
722
20689
  for (const auto& hosts_locality : hosts_per_locality_) {
723
19253
    std::vector<HostVector> current_locality_hosts;
724
19253
    current_locality_hosts.resize(predicates.size());
725

            
726
    // Since # of hosts >> # of predicates, we iterate over the hosts in the outer loop.
727
21496
    for (const auto& host : hosts_locality) {
728
80342
      for (size_t i = 0; i < predicates.size(); ++i) {
729
58940
        if (predicates[i](*host)) {
730
19798
          current_locality_hosts[i].emplace_back(host);
731
19798
        }
732
58940
      }
733
21402
    }
734

            
735
74064
    for (size_t i = 0; i < predicates.size(); ++i) {
736
54811
      mutable_clones[i]->hosts_per_locality_.push_back(std::move(current_locality_hosts[i]));
737
54811
    }
738
19253
  }
739

            
740
20658
  return filtered_clones;
741
20658
}
742

            
743
void HostSetImpl::updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params,
744
                              LocalityWeightsConstSharedPtr locality_weights,
745
                              const HostVector& hosts_added, const HostVector& hosts_removed,
746
                              absl::optional<bool> weighted_priority_health,
747
52978
                              absl::optional<uint32_t> overprovisioning_factor) {
748
52978
  if (weighted_priority_health.has_value()) {
749
52260
    weighted_priority_health_ = weighted_priority_health.value();
750
52260
  }
751
52979
  if (overprovisioning_factor.has_value()) {
752
52204
    ASSERT(overprovisioning_factor.value() > 0);
753
52204
    overprovisioning_factor_ = overprovisioning_factor.value();
754
52204
  }
755
52978
  hosts_ = std::move(update_hosts_params.hosts);
756
52978
  healthy_hosts_ = std::move(update_hosts_params.healthy_hosts);
757
52978
  degraded_hosts_ = std::move(update_hosts_params.degraded_hosts);
758
52978
  excluded_hosts_ = std::move(update_hosts_params.excluded_hosts);
759
52978
  hosts_per_locality_ = std::move(update_hosts_params.hosts_per_locality);
760
52978
  healthy_hosts_per_locality_ = std::move(update_hosts_params.healthy_hosts_per_locality);
761
52978
  degraded_hosts_per_locality_ = std::move(update_hosts_params.degraded_hosts_per_locality);
762
52978
  excluded_hosts_per_locality_ = std::move(update_hosts_params.excluded_hosts_per_locality);
763
52978
  locality_weights_ = std::move(locality_weights);
764

            
765
52978
  runUpdateCallbacks(hosts_added, hosts_removed);
766
52978
}
767

            
768
PrioritySet::UpdateHostsParams
769
HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts,
770
                               HostsPerLocalityConstSharedPtr hosts_per_locality,
771
                               HealthyHostVectorConstSharedPtr healthy_hosts,
772
                               HostsPerLocalityConstSharedPtr healthy_hosts_per_locality,
773
                               DegradedHostVectorConstSharedPtr degraded_hosts,
774
                               HostsPerLocalityConstSharedPtr degraded_hosts_per_locality,
775
                               ExcludedHostVectorConstSharedPtr excluded_hosts,
776
36451
                               HostsPerLocalityConstSharedPtr excluded_hosts_per_locality) {
777
36451
  return PrioritySet::UpdateHostsParams{std::move(hosts),
778
36451
                                        std::move(healthy_hosts),
779
36451
                                        std::move(degraded_hosts),
780
36451
                                        std::move(excluded_hosts),
781
36451
                                        std::move(hosts_per_locality),
782
36451
                                        std::move(healthy_hosts_per_locality),
783
36451
                                        std::move(degraded_hosts_per_locality),
784
36451
                                        std::move(excluded_hosts_per_locality)};
785
36451
}
786

            
787
17516
PrioritySet::UpdateHostsParams HostSetImpl::updateHostsParams(const HostSet& host_set) {
788
17516
  return updateHostsParams(host_set.hostsPtr(), host_set.hostsPerLocalityPtr(),
789
17516
                           host_set.healthyHostsPtr(), host_set.healthyHostsPerLocalityPtr(),
790
17516
                           host_set.degradedHostsPtr(), host_set.degradedHostsPerLocalityPtr(),
791
17516
                           host_set.excludedHostsPtr(), host_set.excludedHostsPerLocalityPtr());
792
17516
}
793
PrioritySet::UpdateHostsParams
794
HostSetImpl::partitionHosts(HostVectorConstSharedPtr hosts,
795
18017
                            HostsPerLocalityConstSharedPtr hosts_per_locality) {
796
18017
  auto partitioned_hosts = ClusterImplBase::partitionHostList(*hosts);
797
18017
  auto healthy_degraded_excluded_hosts_per_locality =
798
18017
      ClusterImplBase::partitionHostsPerLocality(*hosts_per_locality);
799

            
800
18017
  return updateHostsParams(std::move(hosts), std::move(hosts_per_locality),
801
18017
                           std::move(std::get<0>(partitioned_hosts)),
802
18017
                           std::move(std::get<0>(healthy_degraded_excluded_hosts_per_locality)),
803
18017
                           std::move(std::get<1>(partitioned_hosts)),
804
18017
                           std::move(std::get<1>(healthy_degraded_excluded_hosts_per_locality)),
805
18017
                           std::move(std::get<2>(partitioned_hosts)),
806
18017
                           std::move(std::get<2>(healthy_degraded_excluded_hosts_per_locality)));
807
18017
}
808

            
809
const HostSet&
810
PrioritySetImpl::getOrCreateHostSet(uint32_t priority,
811
                                    absl::optional<bool> weighted_priority_health,
812
106477
                                    absl::optional<uint32_t> overprovisioning_factor) {
813
106477
  if (host_sets_.size() < priority + 1) {
814
106088
    for (size_t i = host_sets_.size(); i <= priority; ++i) {
815
53108
      HostSetImplPtr host_set = createHostSet(i, weighted_priority_health, overprovisioning_factor);
816
53108
      host_sets_priority_update_cbs_.push_back(
817
53108
          host_set->addPriorityUpdateCb([this](uint32_t priority, const HostVector& hosts_added,
818
54165
                                               const HostVector& hosts_removed) {
819
52883
            return runReferenceUpdateCallbacks(priority, hosts_added, hosts_removed);
820
52883
          }));
821
53108
      host_sets_.push_back(std::move(host_set));
822
53108
    }
823
52980
  }
824
106477
  return *host_sets_[priority];
825
106477
}
826

            
827
void PrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
828
                                  LocalityWeightsConstSharedPtr locality_weights,
829
                                  const HostVector& hosts_added, const HostVector& hosts_removed,
830
                                  absl::optional<bool> weighted_priority_health,
831
                                  absl::optional<uint32_t> overprovisioning_factor,
832
52091
                                  HostMapConstSharedPtr cross_priority_host_map) {
833
  // Update cross priority host map first. In this way, when the update callbacks of the priority
834
  // set are executed, the latest host map can always be obtained.
835
52091
  if (cross_priority_host_map != nullptr) {
836
33852
    const_cross_priority_host_map_ = std::move(cross_priority_host_map);
837
33852
  }
838

            
839
  // Ensure that we have a HostSet for the given priority.
840
52091
  getOrCreateHostSet(priority, weighted_priority_health, overprovisioning_factor);
841
52091
  static_cast<HostSetImpl*>(host_sets_[priority].get())
842
52091
      ->updateHosts(std::move(update_hosts_params), std::move(locality_weights), hosts_added,
843
52091
                    hosts_removed, weighted_priority_health, overprovisioning_factor);
844

            
845
52091
  if (!batch_update_) {
846
51276
    runUpdateCallbacks(hosts_added, hosts_removed);
847
51276
  }
848
52091
}
849

            
850
801
void PrioritySetImpl::batchHostUpdate(BatchUpdateCb& callback) {
851
801
  BatchUpdateScope scope(*this);
852

            
853
  // We wrap the update call with a lambda that tracks all the hosts that have been added/removed.
854
801
  callback.batchUpdate(scope);
855

            
856
  // Now that all the updates have been complete, we can compute the diff.
857
801
  HostVector net_hosts_added = filterHosts(scope.all_hosts_added_, scope.all_hosts_removed_);
858
801
  HostVector net_hosts_removed = filterHosts(scope.all_hosts_removed_, scope.all_hosts_added_);
859

            
860
801
  runUpdateCallbacks(net_hosts_added, net_hosts_removed);
861
801
}
862

            
863
void PrioritySetImpl::BatchUpdateScope::updateHosts(
864
    uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params,
865
    LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
866
    const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health,
867
796
    absl::optional<uint32_t> overprovisioning_factor) {
868
  // We assume that each call updates a different priority.
869
796
  ASSERT(priorities_.find(priority) == priorities_.end());
870
796
  priorities_.insert(priority);
871

            
872
1024
  for (const auto& host : hosts_added) {
873
984
    all_hosts_added_.insert(host);
874
984
  }
875

            
876
796
  for (const auto& host : hosts_removed) {
877
178
    all_hosts_removed_.insert(host);
878
178
  }
879

            
880
796
  parent_.updateHosts(priority, std::move(update_hosts_params), locality_weights, hosts_added,
881
796
                      hosts_removed, weighted_priority_health, overprovisioning_factor);
882
796
}
883

            
884
void MainPrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
885
                                      LocalityWeightsConstSharedPtr locality_weights,
886
                                      const HostVector& hosts_added,
887
                                      const HostVector& hosts_removed,
888
                                      absl::optional<bool> weighted_priority_health,
889
                                      absl::optional<uint32_t> overprovisioning_factor,
890
17857
                                      HostMapConstSharedPtr cross_priority_host_map) {
891
17857
  ASSERT(cross_priority_host_map == nullptr,
892
17857
         "External cross-priority host map is meaningless to MainPrioritySetImpl");
893
17857
  updateCrossPriorityHostMap(priority, hosts_added, hosts_removed);
894

            
895
17857
  PrioritySetImpl::updateHosts(priority, std::move(update_hosts_params), locality_weights,
896
17857
                               hosts_added, hosts_removed, weighted_priority_health,
897
17857
                               overprovisioning_factor);
898
17857
}
899

            
900
18554
HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const {
901
  // Check if the host set in the main thread PrioritySet has been updated.
902
18554
  if (mutable_cross_priority_host_map_ != nullptr) {
903
17262
    const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_);
904
17262
    ASSERT(mutable_cross_priority_host_map_ == nullptr);
905
17262
  }
906
18554
  return const_cross_priority_host_map_;
907
18554
}
908

            
909
void MainPrioritySetImpl::updateCrossPriorityHostMap(uint32_t priority,
910
                                                     const HostVector& hosts_added,
911
17857
                                                     const HostVector& hosts_removed) {
912
17857
  if (hosts_added.empty() && hosts_removed.empty()) {
913
    // No new hosts have been added and no old hosts have been removed.
914
267
    return;
915
267
  }
916

            
917
  // Since read_only_all_host_map_ may be shared by multiple threads, when the host set changes,
918
  // we cannot directly modify read_only_all_host_map_.
919
17590
  if (mutable_cross_priority_host_map_ == nullptr) {
920
    // Copy old read only host map to mutable host map.
921
17465
    mutable_cross_priority_host_map_ = std::make_shared<HostMap>(*const_cross_priority_host_map_);
922
17465
  }
923

            
924
17590
  for (const auto& host : hosts_removed) {
925
267
    const auto host_address = addressToString(host->address());
926
267
    const auto existing_host = mutable_cross_priority_host_map_->find(host_address);
927
267
    if (existing_host != mutable_cross_priority_host_map_->end()) {
928
      // Only delete from the current priority to protect from situations where
929
      // the add operation was already executed and has already moved the metadata of the host
930
      // from a higher priority value to a lower priority value.
931
261
      if (existing_host->second->priority() == priority) {
932
240
        mutable_cross_priority_host_map_->erase(host_address);
933
240
      }
934
261
    }
935
267
  }
936

            
937
18249
  for (const auto& host : hosts_added) {
938
18232
    mutable_cross_priority_host_map_->insert({addressToString(host->address()), host});
939
18232
  }
940
17590
}
941

            
942
DeferredCreationCompatibleClusterTrafficStats
943
ClusterInfoImpl::generateStats(Stats::ScopeSharedPtr scope,
944
246339
                               const ClusterTrafficStatNames& stat_names, bool defer_creation) {
945
246339
  return Stats::createDeferredCompatibleStats<ClusterTrafficStats>(scope, stat_names,
946
246339
                                                                   defer_creation);
947
246339
}
948

            
949
ClusterRequestResponseSizeStats ClusterInfoImpl::generateRequestResponseSizeStats(
950
228227
    Stats::Scope& scope, const ClusterRequestResponseSizeStatNames& stat_names) {
951
228227
  return {stat_names, scope};
952
228227
}
953

            
954
ClusterLoadReportStats
955
ClusterInfoImpl::generateLoadReportStats(Stats::Scope& scope,
956
246334
                                         const ClusterLoadReportStatNames& stat_names) {
957
246334
  return {stat_names, scope};
958
246334
}
959

            
960
ClusterTimeoutBudgetStats
961
ClusterInfoImpl::generateTimeoutBudgetStats(Stats::Scope& scope,
962
228205
                                            const ClusterTimeoutBudgetStatNames& stat_names) {
963
228205
  return {stat_names, scope};
964
228205
}
965

            
966
absl::StatusOr<std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>>
967
createOptions(const envoy::config::cluster::v3::Cluster& config,
968
              std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>&& options,
969
18137
              Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
970
18137
  if (options) {
971
13440
    return std::move(options);
972
13440
  }
973

            
974
4697
  if (config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_CONFIGURED_PROTOCOL) {
975
    // Make sure multiple protocol configurations are not present
976
4690
    if (config.has_http_protocol_options() && config.has_http2_protocol_options()) {
977
      return absl::InvalidArgumentError(
978
          fmt::format("cluster: Both HTTP1 and HTTP2 options may only be "
979
                      "configured with non-default 'protocol_selection' values"));
980
    }
981
4690
  }
982

            
983
4697
  auto options_or_error =
984
4697
      ClusterInfoImpl::HttpProtocolOptionsConfigImpl::createProtocolOptionsConfig(
985
4697
          config.http_protocol_options(), config.http2_protocol_options(),
986
4697
          config.common_http_protocol_options(),
987
4697
          (config.has_upstream_http_protocol_options()
988
4697
               ? absl::make_optional<envoy::config::core::v3::UpstreamHttpProtocolOptions>(
989
16
                     config.upstream_http_protocol_options())
990
4697
               : absl::nullopt),
991
4697
          config.protocol_selection() ==
992
4697
              envoy::config::cluster::v3::Cluster::USE_DOWNSTREAM_PROTOCOL,
993
4697
          config.has_http2_protocol_options(), factory_context.serverFactoryContext(),
994
4697
          factory_context.messageValidationVisitor());
995
4697
  RETURN_IF_NOT_OK_REF(options_or_error.status());
996
4695
  return options_or_error.value();
997
4697
}
998

            
999
absl::StatusOr<LegacyLbPolicyConfigHelper::Result>
LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProtoWithoutSubset(
13037
    Server::Configuration::ServerFactoryContext& factory_context, const ClusterProto& cluster) {
13037
  TypedLoadBalancerFactory* lb_factory = nullptr;
13037
  switch (cluster.lb_policy()) {
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
12133
  case ClusterProto::ROUND_ROBIN:
12133
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
12133
        "envoy.load_balancing_policies.round_robin");
12133
    break;
20
  case ClusterProto::LEAST_REQUEST:
20
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
20
        "envoy.load_balancing_policies.least_request");
20
    break;
75
  case ClusterProto::RANDOM:
75
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
75
        "envoy.load_balancing_policies.random");
75
    break;
35
  case ClusterProto::RING_HASH:
35
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
35
        "envoy.load_balancing_policies.ring_hash");
35
    break;
460
  case ClusterProto::MAGLEV:
460
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
460
        "envoy.load_balancing_policies.maglev");
460
    break;
314
  case ClusterProto::CLUSTER_PROVIDED:
314
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
314
        "envoy.load_balancing_policies.cluster_provided");
314
    break;
  case ClusterProto::LOAD_BALANCING_POLICY_CONFIG:
    // 'LOAD_BALANCING_POLICY_CONFIG' should be handled by the 'configureLbPolicies'
    // function and should not reach here.
    PANIC("getTypedLbConfigFromLegacyProtoWithoutSubset: should not reach here");
    break;
13037
  }
13037
  if (lb_factory == nullptr) {
    return absl::InvalidArgumentError(
        fmt::format("No load balancer factory found for LB type: {}",
                    ClusterProto::LbPolicy_Name(cluster.lb_policy())));
  }
13037
  auto lb_config_or_error = lb_factory->loadLegacy(factory_context, cluster);
13037
  RETURN_IF_NOT_OK_REF(lb_config_or_error.status());
13037
  return Result{lb_factory, std::move(lb_config_or_error.value())};
13037
}
absl::StatusOr<LegacyLbPolicyConfigHelper::Result>
LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto(
13039
    Server::Configuration::ServerFactoryContext& factory_context, const ClusterProto& cluster) {
13039
  if (!cluster.has_lb_subset_config() ||
      // Note it is possible to have a lb_subset_config without actually having any
      // subset selectors. In this case the subset load balancer should not be used.
13039
      cluster.lb_subset_config().subset_selectors().empty()) {
12986
    return getTypedLbConfigFromLegacyProtoWithoutSubset(factory_context, cluster);
12986
  }
53
  auto* lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
53
      "envoy.load_balancing_policies.subset");
53
  if (lb_factory == nullptr) {
    return absl::InvalidArgumentError("No subset load balancer factory found");
  }
53
  auto subset_lb_config_or_error = lb_factory->loadLegacy(factory_context, cluster);
53
  RETURN_IF_NOT_OK_REF(subset_lb_config_or_error.status());
51
  return Result{lb_factory, std::move(subset_lb_config_or_error.value())};
53
}
using ProtocolOptionsHashMap =
    absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr>;
absl::StatusOr<std::unique_ptr<ClusterInfoImpl>>
ClusterInfoImpl::create(Init::Manager& info,
                        Server::Configuration::ServerFactoryContext& server_context,
                        const envoy::config::cluster::v3::Cluster& config,
                        const absl::optional<envoy::config::core::v3::BindConfig>& bind_config,
                        Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher,
                        Stats::ScopeSharedPtr&& stats_scope, bool added_via_api,
18149
                        Server::Configuration::TransportSocketFactoryContext& ctx) {
18149
  absl::Status creation_status = absl::OkStatus();
18149
  auto ret = std::unique_ptr<ClusterInfoImpl>(new ClusterInfoImpl(
18149
      info, server_context, config, bind_config, runtime, std::move(socket_matcher),
18149
      std::move(stats_scope), added_via_api, ctx, creation_status));
18149
  RETURN_IF_NOT_OK(creation_status);
18135
  return ret;
18149
}
ClusterInfoImpl::ClusterInfoImpl(
    Init::Manager& init_manager, Server::Configuration::ServerFactoryContext& server_context,
    const envoy::config::cluster::v3::Cluster& config,
    const absl::optional<envoy::config::core::v3::BindConfig>& bind_config,
    Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher,
    Stats::ScopeSharedPtr&& stats_scope, bool added_via_api,
    Server::Configuration::TransportSocketFactoryContext& factory_context,
    absl::Status& creation_status)
18149
    : runtime_(runtime), name_(config.name()),
18149
      observability_name_(!config.alt_stat_name().empty()
18149
                              ? std::make_unique<std::string>(config.alt_stat_name())
18149
                              : nullptr),
18149
      eds_service_name_(
18149
          config.has_eds_cluster_config()
18149
              ? std::make_unique<std::string>(config.eds_cluster_config().service_name())
18149
              : nullptr),
18149
      extension_protocol_options_(THROW_OR_RETURN_VALUE(
          parseExtensionProtocolOptions(config, factory_context), ProtocolOptionsHashMap)),
18149
      http_protocol_options_(THROW_OR_RETURN_VALUE(
          createOptions(config,
                        extensionProtocolOptionsTyped<HttpProtocolOptionsConfigImpl>(
                            "envoy.extensions.upstreams.http.v3.HttpProtocolOptions"),
                        factory_context),
          std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>)),
18149
      tcp_protocol_options_(extensionProtocolOptionsTyped<TcpProtocolOptionsConfigImpl>(
18149
          "envoy.extensions.upstreams.tcp.v3.TcpProtocolOptions")),
18149
      max_requests_per_connection_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
          http_protocol_options_->common_http_protocol_options_, max_requests_per_connection,
          config.max_requests_per_connection().value())),
      connect_timeout_(
18149
          std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, connect_timeout, 5000))),
18149
      per_upstream_preconnect_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
          config.preconnect_policy(), per_upstream_preconnect_ratio, 1.0)),
18149
      peekahead_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.preconnect_policy(),
                                                       predictive_preconnect_ratio, 0)),
18149
      socket_matcher_(std::move(socket_matcher)), stats_scope_(std::move(stats_scope)),
18149
      traffic_stats_(generateStats(
18149
          stats_scope_, factory_context.serverFactoryContext().clusterManager().clusterStatNames(),
18149
          server_context.statsConfig().enableDeferredCreationStats())),
18149
      config_update_stats_(
18149
          factory_context.serverFactoryContext().clusterManager().clusterConfigUpdateStatNames(),
18149
          *stats_scope_),
18149
      lb_stats_(factory_context.serverFactoryContext().clusterManager().clusterLbStatNames(),
18149
                *stats_scope_),
18149
      endpoint_stats_(
18149
          factory_context.serverFactoryContext().clusterManager().clusterEndpointStatNames(),
18149
          *stats_scope_),
18149
      load_report_stats_store_(stats_scope_->symbolTable()),
18149
      load_report_stats_(generateLoadReportStats(
18149
          *load_report_stats_store_.rootScope(),
18149
          factory_context.serverFactoryContext().clusterManager().clusterLoadReportStatNames())),
      optional_cluster_stats_(
18149
          (config.has_track_cluster_stats() || config.track_timeout_budgets())
18149
              ? std::make_unique<OptionalClusterStats>(
40
                    config, *stats_scope_, factory_context.serverFactoryContext().clusterManager())
18149
              : nullptr),
18149
      features_(ClusterInfoImpl::HttpProtocolOptionsConfigImpl::parseFeatures(
18149
          config, *http_protocol_options_)),
18149
      resource_managers_(config, runtime, name_, *stats_scope_,
18149
                         factory_context.serverFactoryContext()
18149
                             .clusterManager()
18149
                             .clusterCircuitBreakersStatNames()),
18149
      maintenance_mode_runtime_key_(absl::StrCat("upstream.maintenance_mode.", name_)),
      upstream_local_address_selector_(
18149
          THROW_OR_RETURN_VALUE(createUpstreamLocalAddressSelector(config, bind_config),
                                Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr)),
18149
      upstream_config_(config.has_upstream_config()
18149
                           ? std::make_unique<envoy::config::core::v3::TypedExtensionConfig>(
7
                                 config.upstream_config())
18149
                           : nullptr),
18149
      metadata_(config.has_metadata()
18149
                    ? std::make_unique<envoy::config::core::v3::Metadata>(config.metadata())
18149
                    : nullptr),
18149
      typed_metadata_(config.has_metadata()
18149
                          ? std::make_unique<ClusterTypedMetadata>(config.metadata())
18149
                          : nullptr),
      common_lb_config_(
18149
          factory_context.serverFactoryContext().clusterManager().getCommonLbConfigPtr(
18149
              config.common_lb_config())),
18149
      cluster_type_(config.has_cluster_type()
18149
                        ? std::make_unique<envoy::config::cluster::v3::Cluster::CustomClusterType>(
361
                              config.cluster_type())
18149
                        : nullptr),
      http_filter_config_provider_manager_(
18149
          Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
18149
              server_context)),
      network_filter_config_provider_manager_(
18149
          createSingletonUpstreamNetworkFilterConfigProviderManager(server_context)),
18149
      upstream_context_(server_context, init_manager, *stats_scope_),
18149
      happy_eyeballs_config_(
18149
          config.upstream_connection_options().has_happy_eyeballs_config()
18149
              ? std::make_unique<
1
                    envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig>(
1
                    config.upstream_connection_options().happy_eyeballs_config())
18149
              : nullptr),
18149
      lrs_report_metric_names_(!config.lrs_report_endpoint_metrics().empty()
18149
                                   ? std::make_unique<Envoy::Orca::LrsReportMetricNames>(
8
                                         config.lrs_report_endpoint_metrics().begin(),
8
                                         config.lrs_report_endpoint_metrics().end())
18149
                                   : nullptr),
18149
      shadow_policies_(http_protocol_options_->shadow_policies_),
      per_connection_buffer_limit_bytes_(
18149
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, per_connection_buffer_limit_bytes, 1024 * 1024)),
18149
      buffer_high_watermark_timeout_(std::chrono::milliseconds(
18149
          PROTOBUF_GET_MS_OR_DEFAULT(config, per_connection_buffer_high_watermark_timeout, 0))),
18149
      max_response_headers_count_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
          http_protocol_options_->common_http_protocol_options_, max_headers_count,
          runtime_.snapshot().getInteger(Http::MaxResponseHeadersCountOverrideKey,
                                         Http::DEFAULT_MAX_HEADERS_COUNT))),
18149
      max_response_headers_kb_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
          http_protocol_options_->common_http_protocol_options_, max_response_headers_kb,
          [&]() -> absl::optional<uint16_t> {
            constexpr uint64_t unspecified = 0;
            uint64_t runtime_val = runtime_.snapshot().getInteger(
                Http::MaxResponseHeadersSizeOverrideKey, unspecified);
            if (runtime_val == unspecified) {
              return absl::nullopt;
            }
            return runtime_val;
          }())),
18149
      type_(config.type()),
18149
      drain_connections_on_host_removal_(config.ignore_health_on_host_removal()),
      connection_pool_per_downstream_connection_(
18149
          config.connection_pool_per_downstream_connection()),
18149
      warm_hosts_(!config.health_checks().empty() &&
18149
                  common_lb_config_->ignore_new_hosts_until_first_hc()),
      set_local_interface_name_on_upstream_connections_(
18149
          config.upstream_connection_options().set_local_interface_name_on_upstream_connections()),
18149
      added_via_api_(added_via_api),
18149
      per_endpoint_stats_(config.has_track_cluster_stats() &&
18149
                          config.track_cluster_stats().per_endpoint_stats()) {
#ifdef WIN32
  if (set_local_interface_name_on_upstream_connections_) {
    creation_status = absl::InvalidArgumentError(
        "set_local_interface_name_on_upstream_connections_ cannot be set to true "
        "on Windows platforms");
    return;
  }
#endif
  // Both LoadStatsReporter interface implementations and per_endpoint_stats need to `latch()` the
  // counters, so if both are
  // configured they will interfere with each other and both get incorrect values.
  // TODO(ggreenway): Verify that bypassing virtual dispatch here was intentional
18149
  if (ClusterInfoImpl::perEndpointStatsEnabled() &&
18149
      server_context.bootstrap().cluster_manager().has_load_stats_config()) {
2
    creation_status =
2
        absl::InvalidArgumentError("Only one of cluster per_endpoint_stats and cluster manager "
2
                                   "load_stats_config can be specified");
2
    return;
2
  }
18147
  if (config.has_max_requests_per_connection() &&
18147
      http_protocol_options_->common_http_protocol_options_.has_max_requests_per_connection()) {
1
    creation_status =
1
        absl::InvalidArgumentError("Only one of max_requests_per_connection from Cluster or "
1
                                   "HttpProtocolOptions can be specified");
1
    return;
1
  }
18146
  if (config.has_load_balancing_policy() ||
18146
      config.lb_policy() == envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG) {
    // If load_balancing_policy is set we will use it directly, ignoring lb_policy.
5080
    SET_AND_RETURN_IF_NOT_OK(configureLbPolicies(config, server_context), creation_status);
17668
  } else {
    // If load_balancing_policy is not set, we will try to convert legacy lb_policy
    // to load_balancing_policy and use it.
13066
    auto lb_pair =
13066
        LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto(server_context, config);
13066
    SET_AND_RETURN_IF_NOT_OK(lb_pair.status(), creation_status);
13064
    load_balancer_factory_ = lb_pair->factory;
13064
    ASSERT(load_balancer_factory_ != nullptr, "null load balancer factory");
13064
    load_balancer_config_ = std::move(lb_pair->config);
13064
  }
18139
  if (config.lb_subset_config().locality_weight_aware() &&
18139
      !config.common_lb_config().has_locality_weighted_lb_config()) {
1
    creation_status =
1
        absl::InvalidArgumentError(fmt::format("Locality weight aware subset LB requires that a "
1
                                               "locality_weighted_lb_config be set in {}",
1
                                               name_));
1
    return;
1
  }
  // Use default (1h) or configured `idle_timeout`, unless it's set to 0, indicating that no
  // timeout should be used.
18138
  absl::optional<std::chrono::milliseconds> idle_timeout(std::chrono::hours(1));
18138
  if (http_protocol_options_->common_http_protocol_options_.has_idle_timeout()) {
52
    idle_timeout = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
52
        http_protocol_options_->common_http_protocol_options_.idle_timeout()));
52
    if (idle_timeout.value().count() == 0) {
4
      idle_timeout = absl::nullopt;
4
    }
52
  }
18138
  if (idle_timeout.has_value()) {
18107
    optional_timeouts_.set<OptionalTimeoutNames::IdleTimeout>(*idle_timeout);
18107
  }
  // Use default (10m) or configured `tcp_pool_idle_timeout`, unless it's set to 0, indicating
  // that no timeout should be used.
18138
  absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout(std::chrono::minutes(10));
18138
  if (tcp_protocol_options_ && tcp_protocol_options_->idleTimeout().has_value()) {
5
    tcp_pool_idle_timeout = tcp_protocol_options_->idleTimeout();
5
    if (tcp_pool_idle_timeout.value().count() == 0) {
2
      tcp_pool_idle_timeout = absl::nullopt;
2
    }
5
  }
18138
  if (tcp_pool_idle_timeout.has_value()) {
18109
    optional_timeouts_.set<OptionalTimeoutNames::TcpPoolIdleTimeout>(*tcp_pool_idle_timeout);
18109
  }
  // Use configured `max_connection_duration`, unless it's set to 0, indicating that
  // no timeout should be used. No timeout by default either.
18138
  absl::optional<std::chrono::milliseconds> max_connection_duration;
18138
  if (http_protocol_options_->common_http_protocol_options_.has_max_connection_duration()) {
15
    max_connection_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
15
        http_protocol_options_->common_http_protocol_options_.max_connection_duration()));
15
    if (max_connection_duration.value().count() == 0) {
2
      max_connection_duration = absl::nullopt;
2
    }
15
  }
18138
  if (max_connection_duration.has_value()) {
13
    optional_timeouts_.set<OptionalTimeoutNames::MaxConnectionDuration>(*max_connection_duration);
13
  }
18138
  if (config.has_eds_cluster_config()) {
613
    if (config.type() != envoy::config::cluster::v3::Cluster::EDS) {
2
      creation_status = absl::InvalidArgumentError("eds_cluster_config set in a non-EDS cluster");
2
      return;
2
    }
613
  }
  // TODO(htuch): Remove this temporary workaround when we have
  // https://github.com/bufbuild/protoc-gen-validate/issues/97 resolved. This just provides
  // early validation of sanity of fields that we should catch at config ingestion.
18136
  DurationUtil::durationToMilliseconds(common_lb_config_->update_merge_window());
  // Create upstream network filter factories
18136
  const auto& filters = config.filters();
18136
  ASSERT(filter_factories_.empty());
18136
  filter_factories_.reserve(filters.size());
18188
  for (ssize_t i = 0; i < filters.size(); i++) {
53
    const auto& proto_config = filters[i];
53
    const bool is_terminal = i == filters.size() - 1;
53
    ENVOY_LOG(debug, "  upstream network filter #{}:", i);
53
    if (proto_config.has_config_discovery()) {
37
      if (proto_config.has_typed_config()) {
1
        creation_status =
1
            absl::InvalidArgumentError("Only one of typed_config or config_discovery can be used");
1
        return;
1
      }
36
      ENVOY_LOG(debug, "      dynamic filter name: {}", proto_config.name());
36
      filter_factories_.push_back(
36
          network_filter_config_provider_manager_->createDynamicFilterConfigProvider(
36
              proto_config.config_discovery(), proto_config.name(), server_context,
36
              upstream_context_, factory_context.serverFactoryContext().clusterManager(),
36
              is_terminal, "network", nullptr));
36
      continue;
37
    }
16
    ENVOY_LOG(debug, "    name: {}", proto_config.name());
16
    auto& factory = Config::Utility::getAndCheckFactory<
16
        Server::Configuration::NamedUpstreamNetworkFilterConfigFactory>(proto_config);
16
    auto message = factory.createEmptyConfigProto();
16
    SET_AND_RETURN_IF_NOT_OK(
16
        Config::Utility::translateOpaqueConfig(
16
            proto_config.typed_config(), factory_context.messageValidationVisitor(), *message),
16
        creation_status);
16
    Network::FilterFactoryCb callback =
16
        factory.createFilterFactoryFromProto(*message, upstream_context_);
16
    filter_factories_.push_back(
16
        network_filter_config_provider_manager_->createStaticFilterConfigProvider(
16
            callback, proto_config.name()));
16
  }
18135
  if (http_protocol_options_) {
18108
    if (!http_protocol_options_->http_filters_.empty()) {
443
      creation_status = Http::FilterChainUtility::checkUpstreamHttpFiltersList(
443
          http_protocol_options_->http_filters_);
443
      if (!creation_status.ok()) {
        return;
      }
443
      std::string prefix = stats_scope_->symbolTable().toString(stats_scope_->prefix());
443
      Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext,
443
                              Server::Configuration::UpstreamHttpFilterConfigFactory>
443
          helper(*http_filter_config_provider_manager_, upstream_context_.serverFactoryContext(),
443
                 factory_context.serverFactoryContext().clusterManager(), upstream_context_,
443
                 prefix);
443
      SET_AND_RETURN_IF_NOT_OK(helper.processFilters(http_protocol_options_->http_filters_,
443
                                                     "upstream http", "upstream http",
443
                                                     http_filter_factories_),
443
                               creation_status);
443
    }
18108
  }
18135
}
// Configures the load balancer based on config.load_balancing_policy
absl::Status
ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Cluster& config,
5080
                                     Server::Configuration::ServerFactoryContext& context) {
  // Check if load_balancing_policy is set first.
5080
  if (!config.has_load_balancing_policy()) {
2
    return absl::InvalidArgumentError("cluster: field load_balancing_policy need to be set");
2
  }
5078
  if (config.has_lb_subset_config()) {
1
    return absl::InvalidArgumentError(
1
        "cluster: load_balancing_policy cannot be combined with lb_subset_config");
1
  }
5077
  if (config.has_common_lb_config()) {
23
    const auto& lb_config = config.common_lb_config();
23
    if (lb_config.has_zone_aware_lb_config() || lb_config.has_locality_weighted_lb_config() ||
23
        lb_config.has_consistent_hashing_lb_config()) {
1
      return absl::InvalidArgumentError(
1
          "cluster: load_balancing_policy cannot be combined with partial fields "
1
          "(zone_aware_lb_config, "
1
          "locality_weighted_lb_config, consistent_hashing_lb_config) of common_lb_config");
1
    }
23
  }
5076
  absl::InlinedVector<absl::string_view, 4> missing_policies;
5078
  for (const auto& policy : config.load_balancing_policy().policies()) {
5078
    TypedLoadBalancerFactory* factory =
5078
        Config::Utility::getAndCheckFactory<TypedLoadBalancerFactory>(
5078
            policy.typed_extension_config(), /*is_optional=*/true);
5078
    if (factory != nullptr) {
      // Load and validate the configuration.
5075
      auto proto_message = factory->createEmptyConfigProto();
5075
      RETURN_IF_NOT_OK(Config::Utility::translateOpaqueConfig(
5075
          policy.typed_extension_config().typed_config(), context.messageValidationVisitor(),
5075
          *proto_message));
5075
      load_balancer_factory_ = factory;
5075
      auto lb_config_or_error = factory->loadConfig(context, *proto_message);
5075
      RETURN_IF_NOT_OK_REF(lb_config_or_error.status());
5075
      load_balancer_config_ = std::move(lb_config_or_error.value());
5075
      break;
5075
    }
3
    missing_policies.push_back(policy.typed_extension_config().name());
3
  }
5076
  if (load_balancer_factory_ == nullptr) {
1
    return absl::InvalidArgumentError(
1
        fmt::format("cluster: didn't find a registered load balancer factory "
1
                    "implementation for cluster: '{}' with names from [{}]",
1
                    name_, absl::StrJoin(missing_policies, ", ")));
1
  }
5075
  return absl::OkStatus();
5076
}
ProtocolOptionsConfigConstSharedPtr
37100
ClusterInfoImpl::extensionProtocolOptions(const std::string& name) const {
37100
  auto i = extension_protocol_options_.find(name);
37100
  if (i != extension_protocol_options_.end()) {
13595
    return i->second;
13595
  }
23505
  return nullptr;
37100
}
absl::StatusOr<Network::UpstreamTransportSocketFactoryPtr> createTransportSocketFactory(
    const envoy::config::cluster::v3::Cluster& config,
18150
    Server::Configuration::TransportSocketFactoryContext& factory_context) {
  // If the cluster config doesn't have a transport socket configured, override with the default
  // transport socket implementation based on the tls_context. We copy by value first then
  // override if necessary.
18150
  auto transport_socket = config.transport_socket();
18150
  if (!config.has_transport_socket()) {
16320
    envoy::extensions::transport_sockets::raw_buffer::v3::RawBuffer raw_buffer;
16320
    transport_socket.mutable_typed_config()->PackFrom(raw_buffer);
16320
    transport_socket.set_name("envoy.transport_sockets.raw_buffer");
16320
  }
18150
  auto& config_factory = Config::Utility::getAndCheckFactory<
18150
      Server::Configuration::UpstreamTransportSocketConfigFactory>(transport_socket);
18150
  ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
18150
      transport_socket, factory_context.messageValidationVisitor(), config_factory);
18150
  return config_factory.createTransportSocketFactory(*message, factory_context);
18150
}
30281
void ClusterInfoImpl::createNetworkFilterChain(Network::Connection& connection) const {
30281
  for (const auto& filter_config_provider : filter_factories_) {
50
    auto config = filter_config_provider->config();
50
    if (config.has_value()) {
50
      Network::FilterFactoryCb& factory = config.value();
50
      factory(connection);
50
    }
50
  }
30281
}
std::vector<Http::Protocol>
47490
ClusterInfoImpl::upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const {
47490
  if (downstream_protocol.has_value() &&
47490
      features_ & Upstream::ClusterInfo::Features::USE_DOWNSTREAM_PROTOCOL) {
14
    if (downstream_protocol.value() == Http::Protocol::Http3 &&
14
        !(features_ & Upstream::ClusterInfo::Features::HTTP3)) {
1
      return {Http::Protocol::Http2};
1
    }
    // use HTTP11 since HTTP10 upstream is not supported yet.
13
    if (downstream_protocol.value() == Http::Protocol::Http10) {
2
      return {Http::Protocol::Http11};
2
    }
11
    return {downstream_protocol.value()};
13
  }
47476
  if (features_ & Upstream::ClusterInfo::Features::USE_ALPN) {
94
    if (!(features_ & Upstream::ClusterInfo::Features::HTTP3)) {
37
      return {Http::Protocol::Http2, Http::Protocol::Http11};
37
    }
57
    return {Http::Protocol::Http3, Http::Protocol::Http2, Http::Protocol::Http11};
94
  }
47382
  if (features_ & Upstream::ClusterInfo::Features::HTTP3) {
1453
    return {Http::Protocol::Http3};
1453
  }
45929
  return {(features_ & Upstream::ClusterInfo::Features::HTTP2) ? Http::Protocol::Http2
45929
                                                               : Http::Protocol::Http11};
47382
}
absl::optional<bool>
37531
ClusterInfoImpl::processHttpForOutlierDetection(Http::ResponseHeaderMap& headers) const {
37531
  if (http_protocol_options_->outlier_detection_http_error_matcher_.empty()) {
37507
    return absl::nullopt;
37507
  }
24
  Extensions::Common::Matcher::Matcher::MatchStatusVector statuses;
24
  statuses.reserve(http_protocol_options_->outlier_detection_http_error_matcher_.size());
24
  statuses = Extensions::Common::Matcher::Matcher::MatchStatusVector(
24
      http_protocol_options_->outlier_detection_http_error_matcher_.size());
24
  http_protocol_options_->outlier_detection_http_error_matcher_[0]->onNewStream(statuses);
  // Run matchers.
24
  http_protocol_options_->outlier_detection_http_error_matcher_[0]->onHttpResponseHeaders(headers,
24
                                                                                          statuses);
24
  return absl::optional<bool>(http_protocol_options_->outlier_detection_http_error_matcher_[0]
24
                                  ->matchStatus(statuses)
24
                                  .matches_);
37531
}
absl::StatusOr<bool> validateTransportSocketSupportsQuic(
1061
    const envoy::config::core::v3::TransportSocket& transport_socket) {
  // The transport socket is valid for QUIC if it is either a QUIC transport socket,
  // or if it is a QUIC transport socket wrapped in an HTTP/1.1 proxy socket.
1061
  if (transport_socket.name() == "envoy.transport_sockets.quic") {
1056
    return true;
1056
  }
5
  if (transport_socket.name() != "envoy.transport_sockets.http_11_proxy") {
1
    return false;
1
  }
4
  envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport
4
      http11_socket;
4
  RETURN_IF_NOT_OK(MessageUtil::unpackTo(transport_socket.typed_config(), http11_socket));
4
  return http11_socket.transport_socket().name() == "envoy.transport_sockets.quic";
4
}
ClusterImplBase::ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster,
                                 ClusterFactoryContext& cluster_context,
                                 absl::Status& creation_status)
18104
    : init_manager_(fmt::format("Cluster {}", cluster.name())),
18104
      init_watcher_("ClusterImplBase", [this]() { onInitDone(); }),
18104
      runtime_(cluster_context.serverFactoryContext().runtime()),
18104
      wait_for_warm_on_init_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(cluster, wait_for_warm_on_init, true)),
18104
      random_(cluster_context.serverFactoryContext().api().randomGenerator()),
      local_cluster_(
18104
          cluster_context.serverFactoryContext().clusterManager().localClusterName().value_or("") ==
18104
          cluster.name()),
18104
      const_metadata_shared_pool_(Config::Metadata::getConstMetadataSharedPool(
18104
          cluster_context.serverFactoryContext().singletonManager(),
18104
          cluster_context.serverFactoryContext().mainThreadDispatcher())),
18104
      const_locality_shared_pool_(LocalityPool::getConstLocalitySharedPool(
18104
          cluster_context.serverFactoryContext().singletonManager(),
18104
          cluster_context.serverFactoryContext().mainThreadDispatcher())) {
18104
  auto& server_context = cluster_context.serverFactoryContext();
18104
  auto stats_scope = generateStatsScope(cluster, server_context.serverScope().store());
18104
  transport_factory_context_ =
18104
      std::make_unique<Server::Configuration::TransportSocketFactoryContextImpl>(
18104
          server_context, *stats_scope, cluster_context.messageValidationVisitor());
18104
  transport_factory_context_->setInitManager(init_manager_);
18104
  auto socket_factory_or_error = createTransportSocketFactory(cluster, *transport_factory_context_);
18104
  SET_AND_RETURN_IF_NOT_OK(socket_factory_or_error.status(), creation_status);
18103
  auto* raw_factory_pointer = socket_factory_or_error.value().get();
18103
  OptRef<const xds::type::matcher::v3::Matcher> matcher;
18103
  if (cluster.has_transport_socket_matcher()) {
2
    matcher = makeOptRefFromPtr(&cluster.transport_socket_matcher());
2
  }
18103
  auto socket_matcher_or_error = TransportSocketMatcherImpl::create(
18103
      cluster.transport_socket_matches(), matcher, *transport_factory_context_,
18103
      socket_factory_or_error.value(), *stats_scope);
18103
  SET_AND_RETURN_IF_NOT_OK(socket_matcher_or_error.status(), creation_status);
18103
  auto socket_matcher = std::move(*socket_matcher_or_error);
18103
  const bool matcher_supports_alpn = socket_matcher->allMatchesSupportAlpn();
18103
  auto& dispatcher = server_context.mainThreadDispatcher();
18103
  auto info_or_error =
18103
      ClusterInfoImpl::create(init_manager_, server_context, cluster,
18103
                              cluster_context.serverFactoryContext().clusterManager().bindConfig(),
18103
                              runtime_, std::move(socket_matcher), std::move(stats_scope),
18103
                              cluster_context.addedViaApi(), *transport_factory_context_);
18103
  SET_AND_RETURN_IF_NOT_OK(info_or_error.status(), creation_status);
18089
  info_ = std::shared_ptr<const ClusterInfoImpl>(
18089
      (*info_or_error).release(), [&dispatcher](const ClusterInfoImpl* self) {
18062
        ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name());
18062
        dispatcher.deleteInDispatcherThread(
18062
            std::unique_ptr<const Event::DispatcherThreadDeletable>(self));
18062
      });
18089
  if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN)) {
66
    if (!raw_factory_pointer->supportsAlpn()) {
1
      creation_status = absl::InvalidArgumentError(
1
          fmt::format("ALPN configured for cluster {} which has a non-ALPN transport socket: {}",
1
                      cluster.name(), cluster.DebugString()));
1
      return;
1
    }
65
    if (!matcher_supports_alpn &&
65
        !runtime_.snapshot().featureEnabled(ClusterImplBase::DoNotValidateAlpnRuntimeKey, 0)) {
1
      creation_status = absl::InvalidArgumentError(fmt::format(
1
          "ALPN configured for cluster {} which has a non-ALPN transport socket matcher: {}",
1
          cluster.name(), cluster.DebugString()));
1
      return;
1
    }
65
  }
18087
  if (info_->features() & ClusterInfoImpl::Features::HTTP3) {
1061
#if defined(ENVOY_ENABLE_QUIC)
1061
    absl::StatusOr<bool> supports_quic =
1061
        validateTransportSocketSupportsQuic(cluster.transport_socket());
1061
    SET_AND_RETURN_IF_NOT_OK(supports_quic.status(), creation_status);
1061
    if (!*supports_quic) {
1
      creation_status = absl::InvalidArgumentError(
1
          fmt::format("HTTP3 requires a QuicUpstreamTransport transport socket: {} {}",
1
                      cluster.name(), cluster.transport_socket().DebugString()));
1
      return;
1
    }
#else
    creation_status = absl::InvalidArgumentError("HTTP3 configured but not enabled in the build.");
    return;
#endif
1061
  }
  // Create the default (empty) priority set before registering callbacks to
  // avoid getting an update the first time it is accessed.
18086
  priority_set_.getOrCreateHostSet(0);
18086
  priority_update_cb_ = priority_set_.addPriorityUpdateCb(
18565
      [this](uint32_t, const HostVector& hosts_added, const HostVector& hosts_removed) {
17855
        if (!hosts_added.empty() || !hosts_removed.empty()) {
17588
          info_->endpointStats().membership_change_.inc();
17588
        }
17855
        uint32_t healthy_hosts = 0;
17855
        uint32_t degraded_hosts = 0;
17855
        uint32_t excluded_hosts = 0;
17855
        uint32_t hosts = 0;
18167
        for (const auto& host_set : prioritySet().hostSetsPerPriority()) {
18167
          hosts += host_set->hosts().size();
18167
          healthy_hosts += host_set->healthyHosts().size();
18167
          degraded_hosts += host_set->degradedHosts().size();
18167
          excluded_hosts += host_set->excludedHosts().size();
18167
        }
17855
        info_->endpointStats().membership_total_.set(hosts);
17855
        info_->endpointStats().membership_healthy_.set(healthy_hosts);
17855
        info_->endpointStats().membership_degraded_.set(degraded_hosts);
17855
        info_->endpointStats().membership_excluded_.set(excluded_hosts);
17855
      });
  // Drop overload configuration parsing.
18086
  SET_AND_RETURN_IF_NOT_OK(parseDropOverloadConfig(cluster.load_assignment()), creation_status);
18054
}
namespace {
37987
bool excludeBasedOnHealthFlag(const Host& host) {
37987
  return host.healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) ||
37987
         host.healthFlagGet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL) ||
37987
         host.healthFlagGet(Host::HealthFlag::EDS_STATUS_DRAINING);
37987
}
} // namespace
std::tuple<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr,
           ExcludedHostVectorConstSharedPtr>
18017
ClusterImplBase::partitionHostList(const HostVector& hosts) {
18017
  auto healthy_list = std::make_shared<HealthyHostVector>();
18017
  auto degraded_list = std::make_shared<DegradedHostVector>();
18017
  auto excluded_list = std::make_shared<ExcludedHostVector>();
18017
  healthy_list->get().reserve(hosts.size());
19242
  for (const auto& host : hosts) {
19218
    const Host::Health health_status = host->coarseHealth();
19218
    if (health_status == Host::Health::Healthy) {
18621
      healthy_list->get().emplace_back(host);
18762
    } else if (health_status == Host::Health::Degraded) {
66
      degraded_list->get().emplace_back(host);
66
    }
19218
    if (excludeBasedOnHealthFlag(*host)) {
16
      excluded_list->get().emplace_back(host);
16
    }
19218
  }
18017
  return std::make_tuple(healthy_list, degraded_list, excluded_list);
18017
}
std::tuple<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr,
           HostsPerLocalityConstSharedPtr>
18017
ClusterImplBase::partitionHostsPerLocality(const HostsPerLocality& hosts) {
18017
  auto filtered_clones =
18957
      hosts.filter({[](const Host& host) { return host.coarseHealth() == Host::Health::Healthy; },
18957
                    [](const Host& host) { return host.coarseHealth() == Host::Health::Degraded; },
18957
                    [](const Host& host) { return excludeBasedOnHealthFlag(host); }});
18017
  return std::make_tuple(std::move(filtered_clones[0]), std::move(filtered_clones[1]),
18017
                         std::move(filtered_clones[2]));
18017
}
47504
bool ClusterInfoImpl::maintenanceMode() const {
47504
  return runtime_.snapshot().featureEnabled(maintenance_mode_runtime_key_, 0);
47504
}
413612
ResourceManager& ClusterInfoImpl::resourceManager(ResourcePriority priority) const {
413612
  ASSERT(enumToInt(priority) < resource_managers_.managers_.size());
413612
  return *resource_managers_.managers_[enumToInt(priority)];
413612
}
17684
void ClusterImplBase::initialize(std::function<absl::Status()> callback) {
17684
  ASSERT(!initialization_started_);
17684
  ASSERT(initialization_complete_callback_ == nullptr);
17684
  initialization_complete_callback_ = callback;
17684
  startPreInit();
17684
}
17982
void ClusterImplBase::onPreInitComplete() {
  // Protect against multiple calls.
17982
  if (initialization_started_) {
456
    return;
456
  }
17526
  initialization_started_ = true;
17526
  ENVOY_LOG(debug, "initializing {} cluster {} completed",
17526
            initializePhase() == InitializePhase::Primary ? "Primary" : "Secondary",
17526
            info()->name());
17526
  init_manager_.initialize(init_watcher_);
17526
}
17526
void ClusterImplBase::onInitDone() {
17526
  info()->configUpdateStats().warming_state_.set(0);
17526
  if (health_checker_ && pending_initialize_health_checks_ == 0) {
110
    for (auto& host_set : prioritySet().hostSetsPerPriority()) {
130
      for (auto& host : host_set->hosts()) {
111
        if (host->disableActiveHealthCheck()) {
6
          continue;
6
        }
105
        ++pending_initialize_health_checks_;
105
      }
110
    }
107
    ENVOY_LOG(debug, "Cluster onInitDone pending initialize health check count {}",
107
              pending_initialize_health_checks_);
    // TODO(mattklein123): Remove this callback when done.
107
    health_checker_->addHostCheckCompleteCb(
173
        [this](HostSharedPtr, HealthTransition, HealthState) -> void {
151
          if (pending_initialize_health_checks_ > 0 && --pending_initialize_health_checks_ == 0) {
60
            finishInitialization();
60
          }
151
        });
107
  }
17526
  if (pending_initialize_health_checks_ == 0) {
17442
    finishInitialization();
17442
  }
17526
}
17502
void ClusterImplBase::finishInitialization() {
17502
  ASSERT(initialization_complete_callback_ != nullptr);
17502
  ASSERT(initialization_started_);
  // Snap a copy of the completion callback so that we can set it to nullptr to unblock
  // reloadHealthyHosts(). See that function for more info on why we do this.
17502
  auto snapped_callback = initialization_complete_callback_;
17502
  initialization_complete_callback_ = nullptr;
17502
  if (health_checker_ != nullptr) {
83
    reloadHealthyHosts(nullptr);
83
  }
17502
  if (snapped_callback != nullptr) {
17502
    THROW_IF_NOT_OK(snapped_callback());
17502
  }
17502
}
absl::Status ClusterImplBase::parseDropOverloadConfig(
18832
    const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment) {
  // Default drop_overload_ to zero.
18832
  drop_overload_ = UnitFloat(0);
18832
  if (!cluster_load_assignment.has_policy()) {
18696
    return absl::OkStatus();
18696
  }
136
  const auto& policy = cluster_load_assignment.policy();
136
  if (policy.drop_overloads().empty()) {
42
    return absl::OkStatus();
42
  }
94
  if (policy.drop_overloads().size() > kDropOverloadSize) {
8
    return absl::InvalidArgumentError(
8
        fmt::format("Cluster drop_overloads config has {} categories. Envoy only support one.",
8
                    policy.drop_overloads().size()));
8
  }
86
  const auto& drop_percentage = policy.drop_overloads(0).drop_percentage();
86
  float denominator = 100;
86
  switch (drop_percentage.denominator()) {
62
  case envoy::type::v3::FractionalPercent::HUNDRED:
62
    denominator = 100;
62
    break;
8
  case envoy::type::v3::FractionalPercent::TEN_THOUSAND:
8
    denominator = 10000;
8
    break;
8
  case envoy::type::v3::FractionalPercent::MILLION:
8
    denominator = 1000000;
8
    break;
8
  default:
8
    return absl::InvalidArgumentError(fmt::format(
8
        "Cluster drop_overloads config denominator setting is invalid : {}. Valid range 0~2.",
8
        static_cast<int>(drop_percentage.denominator())));
86
  }
  // If DropOverloadRuntimeKey is not enabled, honor the EDS drop_overload config.
  // If it is enabled, choose the smaller one between it and the EDS config.
78
  float drop_ratio = float(drop_percentage.numerator()) / (denominator);
78
  if (drop_ratio > 1) {
8
    return absl::InvalidArgumentError(
8
        fmt::format("Cluster drop_overloads config is invalid. drop_ratio={}(Numerator {} / "
8
                    "Denominator {}). The valid range is 0~1.",
8
                    drop_ratio, drop_percentage.numerator(), denominator));
8
  }
70
  const uint64_t MAX_DROP_OVERLOAD_RUNTIME = 100;
70
  uint64_t drop_ratio_runtime = runtime_.snapshot().getInteger(
70
      ClusterImplBase::DropOverloadRuntimeKey, MAX_DROP_OVERLOAD_RUNTIME);
70
  if (drop_ratio_runtime > MAX_DROP_OVERLOAD_RUNTIME) {
8
    return absl::InvalidArgumentError(
8
        fmt::format("load_balancing_policy.drop_overload_limit runtime key config {} is invalid. "
8
                    "The valid range is 0~100",
8
                    drop_ratio_runtime));
8
  }
62
  drop_ratio = std::min(drop_ratio, float(drop_ratio_runtime) / float(MAX_DROP_OVERLOAD_RUNTIME));
62
  drop_overload_ = UnitFloat(drop_ratio);
62
  drop_category_ = policy.drop_overloads(0).category();
62
  return absl::OkStatus();
70
}
121
void ClusterImplBase::setHealthChecker(const HealthCheckerSharedPtr& health_checker) {
121
  ASSERT(!health_checker_);
121
  health_checker_ = health_checker;
121
  health_checker_->start();
121
  health_checker_->addHostCheckCompleteCb(
187
      [this](const HostSharedPtr& host, HealthTransition changed_state, HealthState) -> void {
        // If we get a health check completion that resulted in a state change, signal to
        // update the host sets on all threads.
151
        if (changed_state == HealthTransition::Changed) {
107
          reloadHealthyHosts(host);
107
        }
151
      });
121
}
17749
void ClusterImplBase::setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector) {
17749
  if (!outlier_detector) {
17696
    return;
17696
  }
53
  outlier_detector_ = outlier_detector;
53
  outlier_detector_->addChangedStateCb(
55
      [this](const HostSharedPtr& host) -> void { reloadHealthyHosts(host); });
53
}
213
void ClusterImplBase::reloadHealthyHosts(const HostSharedPtr& host) {
  // Every time a host changes Health Check state we cause a full healthy host recalculation which
  // for expensive LBs (ring, subset, etc.) can be quite time consuming. During startup, this
  // can also block worker threads by doing this repeatedly. There is no reason to do this
  // as we will not start taking traffic until we are initialized. By blocking Health Check
  // updates while initializing we can avoid this.
213
  if (initialization_complete_callback_ != nullptr) {
46
    return;
46
  }
167
  reloadHealthyHostsHelper(host);
167
}
101
void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) {
101
  const auto& host_sets = prioritySet().hostSetsPerPriority();
202
  for (size_t priority = 0; priority < host_sets.size(); ++priority) {
101
    const auto& host_set = host_sets[priority];
    // TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet?
101
    HostVectorConstSharedPtr hosts_copy = std::make_shared<HostVector>(host_set->hosts());
101
    HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone();
101
    prioritySet().updateHosts(priority,
101
                              HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
101
                              host_set->localityWeights(), {}, {}, absl::nullopt, absl::nullopt);
101
  }
101
}
absl::StatusOr<const Network::Address::InstanceConstSharedPtr>
18109
ClusterImplBase::resolveProtoAddress(const envoy::config::core::v3::Address& address) {
18109
  absl::Status resolve_status;
18109
  TRY_ASSERT_MAIN_THREAD {
18109
    auto address_or_error = Network::Address::resolveProtoAddress(address);
18109
    if (address_or_error.status().ok()) {
18107
      return address_or_error.value();
18107
    }
2
    resolve_status = address_or_error.status();
2
  }
2
  END_TRY
18109
  CATCH(EnvoyException & e, { resolve_status = absl::InvalidArgumentError(e.what()); });
2
  if (info_->type() == envoy::config::cluster::v3::Cluster::STATIC ||
2
      info_->type() == envoy::config::cluster::v3::Cluster::EDS) {
2
    return absl::InvalidArgumentError(
2
        fmt::format("{}. Consider setting resolver_name or setting cluster type "
2
                    "to 'STRICT_DNS' or 'LOGICAL_DNS'",
2
                    resolve_status.message()));
2
  }
  return resolve_status;
2
}
absl::Status ClusterImplBase::validateEndpoints(
    absl::Span<const envoy::config::endpoint::v3::LocalityLbEndpoints* const> localities,
17685
    OptRef<const PriorityState> priorities) const {
18126
  for (const auto* endpoints : localities) {
17803
    if (local_cluster_ && endpoints->priority() > 0) {
3
      return absl::InvalidArgumentError(
3
          fmt::format("Unexpected non-zero priority for local cluster '{}'.", info()->name()));
3
    }
17803
  }
17682
  if (priorities.has_value()) {
17357
    OptRef<const LoadBalancerConfig> lb_config = info_->loadBalancerConfig();
17357
    if (lb_config.has_value()) {
17357
      return lb_config->validateEndpoints(*priorities);
17357
    }
17357
  }
325
  return absl::OkStatus();
17682
}
ClusterInfoImpl::OptionalClusterStats::OptionalClusterStats(
    const envoy::config::cluster::v3::Cluster& config, Stats::Scope& stats_scope,
    const ClusterManager& manager)
    : timeout_budget_stats_(
40
          (config.track_cluster_stats().timeout_budgets() || config.track_timeout_budgets())
40
              ? std::make_unique<ClusterTimeoutBudgetStats>(generateTimeoutBudgetStats(
6
                    stats_scope, manager.clusterTimeoutBudgetStatNames()))
40
              : nullptr),
      request_response_size_stats_(
40
          (config.track_cluster_stats().request_response_sizes()
40
               ? std::make_unique<ClusterRequestResponseSizeStats>(generateRequestResponseSizeStats(
28
                     stats_scope, manager.clusterRequestResponseSizeStatNames()))
40
               : nullptr)) {}
ClusterInfoImpl::ResourceManagers::ResourceManagers(
    const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime,
    const std::string& cluster_name, Stats::Scope& stats_scope,
    const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names)
18135
    : circuit_breakers_stat_names_(circuit_breakers_stat_names) {
18135
  managers_[enumToInt(ResourcePriority::Default)] = THROW_OR_RETURN_VALUE(
18135
      load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::DEFAULT),
18135
      ResourceManagerImplPtr);
18135
  managers_[enumToInt(ResourcePriority::High)] = THROW_OR_RETURN_VALUE(
18135
      load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::HIGH),
18135
      ResourceManagerImplPtr);
18135
}
ClusterCircuitBreakersStats
ClusterInfoImpl::generateCircuitBreakersStats(Stats::Scope& scope, Stats::StatName prefix,
                                              bool track_remaining,
264465
                                              const ClusterCircuitBreakersStatNames& stat_names) {
2463540
  auto make_gauge = [&stat_names, &scope, prefix](Stats::StatName stat_name) -> Stats::Gauge& {
2463540
    return Stats::Utility::gaugeFromElements(scope,
2463540
                                             {stat_names.circuit_breakers_, prefix, stat_name},
2463540
                                             Stats::Gauge::ImportMode::Accumulate);
2463540
  };
264465
#define REMAINING_GAUGE(stat_name)                                                                 \
1322325
  track_remaining ? make_gauge(stat_name) : scope.store().nullGauge()
264465
  return {
264465
      make_gauge(stat_names.cx_open_),
264465
      make_gauge(stat_names.cx_pool_open_),
264465
      make_gauge(stat_names.rq_open_),
264465
      make_gauge(stat_names.rq_pending_open_),
264465
      make_gauge(stat_names.rq_retry_open_),
264465
      REMAINING_GAUGE(stat_names.remaining_cx_),
264465
      REMAINING_GAUGE(stat_names.remaining_cx_pools_),
264465
      REMAINING_GAUGE(stat_names.remaining_pending_),
264465
      REMAINING_GAUGE(stat_names.remaining_retries_),
264465
      REMAINING_GAUGE(stat_names.remaining_rq_),
264465
  };
264465
#undef REMAINING_GAUGE
264465
}
21958
Http::Http1::CodecStats& ClusterInfoImpl::http1CodecStats() const {
21958
  return Http::Http1::CodecStats::atomicGet(http1_codec_stats_, *stats_scope_);
21958
}
6078
Http::Http2::CodecStats& ClusterInfoImpl::http2CodecStats() const {
6078
  return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, *stats_scope_);
6078
}
934
Http::Http3::CodecStats& ClusterInfoImpl::http3CodecStats() const {
934
  return Http::Http3::CodecStats::atomicGet(http3_codec_stats_, *stats_scope_);
934
}
#ifdef ENVOY_ENABLE_UHV
::Envoy::Http::HeaderValidatorStats&
ClusterInfoImpl::getHeaderValidatorStats(Http::Protocol protocol) const {
  switch (protocol) {
  case Http::Protocol::Http10:
  case Http::Protocol::Http11:
    return http1CodecStats();
  case Http::Protocol::Http2:
    return http2CodecStats();
  case Http::Protocol::Http3:
    return http3CodecStats();
  }
  PANIC_DUE_TO_CORRUPT_ENUM;
}
#endif
Http::ClientHeaderValidatorPtr
46906
ClusterInfoImpl::makeHeaderValidator([[maybe_unused]] Http::Protocol protocol) const {
#ifdef ENVOY_ENABLE_UHV
  return http_protocol_options_->header_validator_factory_
             ? http_protocol_options_->header_validator_factory_->createClientHeaderValidator(
                   protocol, getHeaderValidatorStats(protocol))
             : nullptr;
#else
46906
  return nullptr;
46906
#endif
46906
}
std::pair<absl::optional<double>, absl::optional<uint32_t>> ClusterInfoImpl::getRetryBudgetParams(
261
    const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds) {
261
  constexpr double default_budget_percent = 20.0;
261
  constexpr uint32_t default_retry_concurrency = 3;
261
  absl::optional<double> budget_percent;
261
  absl::optional<uint32_t> min_retry_concurrency;
261
  if (thresholds.has_retry_budget()) {
    // The budget_percent and min_retry_concurrency values are only set if there is a retry budget
    // message set in the cluster config.
8
    budget_percent = PROTOBUF_GET_WRAPPED_OR_DEFAULT(thresholds.retry_budget(), budget_percent,
8
                                                     default_budget_percent);
8
    min_retry_concurrency = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
8
        thresholds.retry_budget(), min_retry_concurrency, default_retry_concurrency);
8
  }
261
  return std::make_pair(budget_percent, min_retry_concurrency);
261
}
SINGLETON_MANAGER_REGISTRATION(upstream_network_filter_config_provider_manager);
std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
ClusterInfoImpl::createSingletonUpstreamNetworkFilterConfigProviderManager(
18122
    Server::Configuration::ServerFactoryContext& context) {
18122
  return context.singletonManager().getTyped<UpstreamNetworkFilterConfigProviderManager>(
18122
      SINGLETON_MANAGER_REGISTERED_NAME(upstream_network_filter_config_provider_manager),
18122
      [] { return std::make_shared<Filter::UpstreamNetworkFilterConfigProviderManagerImpl>(); });
18122
}
absl::StatusOr<ResourceManagerImplPtr>
ClusterInfoImpl::ResourceManagers::load(const envoy::config::cluster::v3::Cluster& config,
                                        Runtime::Loader& runtime, const std::string& cluster_name,
                                        Stats::Scope& stats_scope,
36268
                                        const envoy::config::core::v3::RoutingPriority& priority) {
36268
  uint64_t max_connections = 1024;
36268
  uint64_t max_pending_requests = 1024;
36268
  uint64_t max_requests = 1024;
36268
  uint64_t max_retries = 3;
36268
  uint64_t max_connection_pools = std::numeric_limits<uint64_t>::max();
36268
  uint64_t max_connections_per_host = std::numeric_limits<uint64_t>::max();
36268
  bool track_remaining = false;
36268
  Stats::StatName priority_stat_name;
36268
  std::string priority_name;
36268
  switch (priority) {
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
18135
  case envoy::config::core::v3::DEFAULT:
18135
    priority_stat_name = circuit_breakers_stat_names_.default_;
18135
    priority_name = "default";
18135
    break;
18133
  case envoy::config::core::v3::HIGH:
18133
    priority_stat_name = circuit_breakers_stat_names_.high_;
18133
    priority_name = "high";
18133
    break;
36268
  }
36268
  const std::string runtime_prefix =
36268
      fmt::format("circuit_breakers.{}.{}.", cluster_name, priority_name);
36268
  const auto& thresholds = config.circuit_breakers().thresholds();
36268
  const auto it = std::find_if(
36268
      thresholds.cbegin(), thresholds.cend(),
36272
      [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) {
512
        return threshold.priority() == priority;
512
      });
36268
  const auto& per_host_thresholds = config.circuit_breakers().per_host_thresholds();
36268
  const auto per_host_it = std::find_if(
36268
      per_host_thresholds.cbegin(), per_host_thresholds.cend(),
36268
      [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) {
12
        return threshold.priority() == priority;
12
      });
36268
  absl::optional<double> budget_percent;
36268
  absl::optional<uint32_t> min_retry_concurrency;
36268
  if (it != thresholds.cend()) {
251
    max_connections = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connections, max_connections);
251
    max_pending_requests =
251
        PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_pending_requests, max_pending_requests);
251
    max_requests = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_requests, max_requests);
251
    max_retries = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_retries, max_retries);
251
    track_remaining = it->track_remaining();
251
    max_connection_pools =
251
        PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connection_pools, max_connection_pools);
251
    std::tie(budget_percent, min_retry_concurrency) = ClusterInfoImpl::getRetryBudgetParams(*it);
251
  }
36268
  if (per_host_it != per_host_thresholds.cend()) {
8
    if (per_host_it->has_max_pending_requests() || per_host_it->has_max_requests() ||
8
        per_host_it->has_max_retries() || per_host_it->has_max_connection_pools() ||
8
        per_host_it->has_retry_budget()) {
2
      return absl::InvalidArgumentError("Unsupported field in per_host_thresholds");
2
    }
6
    if (per_host_it->has_max_connections()) {
6
      max_connections_per_host = per_host_it->max_connections().value();
6
    }
6
  }
36266
  return std::make_unique<ResourceManagerImpl>(
36266
      runtime, runtime_prefix, max_connections, max_pending_requests, max_requests, max_retries,
36266
      max_connection_pools, max_connections_per_host,
36266
      ClusterInfoImpl::generateCircuitBreakersStats(stats_scope, priority_stat_name,
36266
                                                    track_remaining, circuit_breakers_stat_names_),
36266
      budget_percent, min_retry_concurrency);
36268
}
PriorityStateManager::PriorityStateManager(ClusterImplBase& cluster,
                                           const LocalInfo::LocalInfo& local_info,
                                           PrioritySet::HostUpdateCb* update_cb)
17692
    : parent_(cluster), local_info_node_(local_info.node()), update_cb_(update_cb) {}
void PriorityStateManager::initializePriorityFor(
17992
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
17992
  const uint32_t priority = locality_lb_endpoint.priority();
17992
  if (priority_state_.size() <= priority) {
17639
    priority_state_.resize(priority + 1);
17639
  }
17992
  if (priority_state_[priority].first == nullptr) {
17641
    priority_state_[priority].first = std::make_unique<HostVector>();
17641
  }
17992
  if (locality_lb_endpoint.has_locality() && locality_lb_endpoint.has_load_balancing_weight()) {
77
    priority_state_[priority].second[locality_lb_endpoint.locality()] =
77
        locality_lb_endpoint.load_balancing_weight().value();
77
  }
17992
}
void PriorityStateManager::registerHostForPriority(
    const std::string& hostname, Network::Address::InstanceConstSharedPtr address,
    const std::vector<Network::Address::InstanceConstSharedPtr>& address_list,
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
18079
    const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint) {
18079
  auto endpoint_metadata =
18079
      lb_endpoint.has_metadata()
18079
          ? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata())
18079
          : nullptr;
18079
  auto locality_metadata =
18079
      locality_lb_endpoint.has_metadata()
18079
          ? parent_.constMetadataSharedPool()->getObject(locality_lb_endpoint.metadata())
18079
          : nullptr;
18079
  const auto host = std::shared_ptr<HostImpl>(THROW_OR_RETURN_VALUE(
18079
      HostImpl::create(
18079
          parent_.info(), hostname, address, endpoint_metadata, locality_metadata,
18079
          lb_endpoint.load_balancing_weight().value(),
18079
          parent_.constLocalitySharedPool()->getObject(locality_lb_endpoint.locality()),
18079
          lb_endpoint.endpoint().health_check_config(), locality_lb_endpoint.priority(),
18079
          lb_endpoint.health_status(), address_list),
18079
      std::unique_ptr<HostImpl>));
18079
  registerHostForPriority(host, locality_lb_endpoint);
18079
}
void PriorityStateManager::registerHostForPriority(
    const HostSharedPtr& host,
18489
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
18489
  const uint32_t priority = locality_lb_endpoint.priority();
  // Should be called after initializePriorityFor.
18489
  ASSERT(priority_state_[priority].first);
18489
  priority_state_[priority].first->emplace_back(host);
18489
}
void PriorityStateManager::updateClusterPrioritySet(
    const uint32_t priority, HostVectorSharedPtr&& current_hosts,
    const absl::optional<HostVector>& hosts_added, const absl::optional<HostVector>& hosts_removed,
    const absl::optional<Upstream::Host::HealthFlag> health_checker_flag,
    absl::optional<bool> weighted_priority_health,
17508
    absl::optional<uint32_t> overprovisioning_factor) {
  // If local locality is not defined then skip populating per locality hosts.
17508
  const auto& local_locality = local_info_node_.locality();
17508
  ENVOY_LOG(trace, "Local locality: {}", local_locality.DebugString());
  // For non-EDS, most likely the current hosts are from priority_state_[priority].first.
17508
  HostVectorSharedPtr hosts(std::move(current_hosts));
17508
  LocalityWeightsMap empty_locality_map;
17508
  LocalityWeightsMap& locality_weights_map =
17508
      priority_state_.size() > priority ? priority_state_[priority].second : empty_locality_map;
17508
  ASSERT(priority_state_.size() > priority || locality_weights_map.empty());
17508
  LocalityWeightsSharedPtr locality_weights;
17508
  std::vector<HostVector> per_locality;
  // TODO: have the load balancing extension indicate, programmatically, whether it needs locality
  // weights, as an optimization in cases where it doesn't.
17508
  locality_weights = std::make_shared<LocalityWeights>();
  // We use std::map to guarantee a stable ordering for zone aware routing.
17508
  std::map<envoy::config::core::v3::Locality, HostVector, LocalityLess> hosts_per_locality;
18398
  for (const HostSharedPtr& host : *hosts) {
    // Take into consideration when a non-EDS cluster has active health checking, i.e. to mark all
    // the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and then fire
    // update callbacks to start the health checking process. The endpoint with disabled active
    // health check should not be set FAILED_ACTIVE_HC here.
18380
    if (health_checker_flag.has_value() && !host->disableActiveHealthCheck()) {
49
      host->healthFlagSet(health_checker_flag.value());
49
    }
18380
    hosts_per_locality[host->locality()].push_back(host);
18380
  }
  // Do we have hosts for the local locality?
17508
  const bool non_empty_local_locality =
17508
      local_info_node_.has_locality() &&
17508
      hosts_per_locality.find(local_locality) != hosts_per_locality.end();
  // As per HostsPerLocality::get(), the per_locality vector must have the local locality hosts
  // first if non_empty_local_locality.
17508
  if (non_empty_local_locality) {
56
    per_locality.emplace_back(hosts_per_locality[local_locality]);
56
    locality_weights->emplace_back(locality_weights_map[local_locality]);
56
  }
  // After the local locality hosts (if any), we place the remaining locality host groups in
  // lexicographic order. This provides a stable ordering for zone aware routing.
17530
  for (auto& entry : hosts_per_locality) {
17482
    if (!non_empty_local_locality || !LocalityEqualTo()(local_locality, entry.first)) {
17426
      per_locality.emplace_back(entry.second);
17426
      locality_weights->emplace_back(locality_weights_map[entry.first]);
17426
    }
17482
  }
17508
  auto per_locality_shared =
17508
      std::make_shared<HostsPerLocalityImpl>(std::move(per_locality), non_empty_local_locality);
  // If a batch update callback was provided, use that. Otherwise directly update
  // the PrioritySet.
17508
  if (update_cb_ != nullptr) {
790
    update_cb_->updateHosts(priority, HostSetImpl::partitionHosts(hosts, per_locality_shared),
790
                            std::move(locality_weights), hosts_added.value_or(*hosts),
790
                            hosts_removed.value_or<HostVector>({}), weighted_priority_health,
790
                            overprovisioning_factor);
17025
  } else {
16718
    parent_.prioritySet().updateHosts(
16718
        priority, HostSetImpl::partitionHosts(hosts, per_locality_shared),
16718
        std::move(locality_weights), hosts_added.value_or(*hosts),
16718
        hosts_removed.value_or<HostVector>({}), weighted_priority_health, overprovisioning_factor);
16718
  }
17508
}
bool BaseDynamicClusterImpl::updateDynamicHostList(
    const HostVector& new_hosts, HostVector& current_priority_hosts,
    HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority,
1109
    const HostMap& all_hosts, const absl::flat_hash_set<std::string>& all_new_hosts) {
1109
  uint64_t max_host_weight = 1;
  // Did hosts change?
  //
  // Have host attributes changed the health of any endpoint? If so, we
  // rebuild the hosts vectors. We only do this if the health status of an
  // endpoint has materially changed (e.g. if previously failing active health
  // checks, we just note it's now failing EDS health status but don't rebuild).
  //
  // TODO(htuch): We can be smarter about this potentially, and not force a full
  // host set update on health status change. The way this would work is to
  // implement a HealthChecker subclass that provides thread local health
  // updates to the Cluster object. This will probably make sense to do in
  // conjunction with https://github.com/envoyproxy/envoy/issues/2874.
1109
  bool hosts_changed = false;
  // Go through and see if the list we have is different from what we just got. If it is, we make
  // a new host list and raise a change notification. We also check for duplicates here. It's
  // possible for DNS to return the same address multiple times, and a bad EDS implementation
  // could do the same thing.
  // Keep track of hosts we see in new_hosts that we are able to match up with an existing host.
1109
  absl::flat_hash_set<std::string> existing_hosts_for_current_priority(
1109
      current_priority_hosts.size());
  // Keep track of hosts we're adding (or replacing)
1109
  absl::flat_hash_set<std::string> new_hosts_for_current_priority(new_hosts.size());
  // Keep track of hosts for which locality is changed.
1109
  absl::flat_hash_set<std::string> hosts_with_updated_locality_for_current_priority(
1109
      current_priority_hosts.size());
  // Keep track of hosts for which active health check flag is changed.
1109
  absl::flat_hash_set<std::string> hosts_with_active_health_check_flag_changed(
1109
      current_priority_hosts.size());
1109
  HostVector final_hosts;
1109
  final_hosts.reserve(new_hosts.size() + current_priority_hosts.size());
1509
  for (const HostSharedPtr& host : new_hosts) {
    // To match a new host with an existing host means comparing their addresses.
1445
    const auto host_address_string = addressToString(host->address());
1445
    auto existing_host = all_hosts.find(host_address_string);
1445
    const bool existing_host_found = existing_host != all_hosts.end();
    // Clear any pending deletion flag on an existing host in case it came back while it was
    // being stabilized. We will set it again below if needed.
1445
    if (existing_host_found) {
516
      existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
516
    }
    // Check if in-place host update should be skipped, i.e. when the following criteria are met
    // (currently there is only one criterion, but we might add more in the future):
    // - The cluster health checker is activated and a new host is matched with the existing one,
    //   but the health check address is different.
1445
    const bool health_check_address_changed =
1445
        (health_checker_ != nullptr && existing_host_found &&
1445
         *existing_host->second->healthCheckAddress() != *host->healthCheckAddress());
1445
    bool locality_changed = false;
1445
    locality_changed = (existing_host_found &&
1445
                        (!LocalityEqualTo()(host->locality(), existing_host->second->locality())));
1445
    if (locality_changed) {
14
      hosts_with_updated_locality_for_current_priority.emplace(existing_host->first);
14
    }
1445
    const bool active_health_check_flag_changed =
1445
        (health_checker_ != nullptr && existing_host_found &&
1445
         existing_host->second->disableActiveHealthCheck() != host->disableActiveHealthCheck());
1445
    if (active_health_check_flag_changed) {
4
      hosts_with_active_health_check_flag_changed.emplace(existing_host->first);
4
    }
1445
    const bool skip_inplace_host_update =
1445
        health_check_address_changed || locality_changed || active_health_check_flag_changed;
    // When there is a match and we decided to do in-place update, we potentially update the
    // host's health check flag and metadata. Afterwards, the host is pushed back into the
    // final_hosts, i.e. hosts that should be preserved in the current priority.
1445
    if (existing_host_found && !skip_inplace_host_update) {
492
      existing_hosts_for_current_priority.emplace(existing_host->first);
      // If we find a host matched based on address, we keep it. However we do change weight
      // inline so do that here.
492
      if (host->weight() > max_host_weight) {
6
        max_host_weight = host->weight();
6
      }
492
      if (existing_host->second->weight() != host->weight()) {
60
        existing_host->second->weight(host->weight());
        // We do full host set rebuilds so that load balancers can do pre-computation of data
        // structures based on host weight. This may become a performance problem in certain
        // deployments so it is runtime feature guarded and may also need to be configurable
        // and/or dynamic in the future.
60
        hosts_changed = true;
60
      }
492
      hosts_changed |= updateEdsHealthFlag(*host, *existing_host->second);
      // Did metadata change? Compare cached hashes for O(1) comparison.
492
      const bool metadata_changed = host->metadataHash() != existing_host->second->metadataHash();
492
      if (metadata_changed) {
        // First, update the entire metadata for the endpoint.
1
        existing_host->second->metadata(host->metadata());
        // Also, given that the canary attribute of an endpoint is derived from its metadata
        // (e.g.: from envoy.lb/canary), we do a blind update here since it's cheaper than testing
        // to see if it actually changed. We must update this besides just updating the metadata,
        // because it'll be used by the router filter to compute upstream stats.
1
        existing_host->second->canary(host->canary());
        // If metadata changed, we need to rebuild. See github issue #3810.
1
        hosts_changed = true;
1
      }
      // Did the priority change?
492
      if (host->priority() != existing_host->second->priority()) {
54
        existing_host->second->priority(host->priority());
54
        hosts_added_to_current_priority.emplace_back(existing_host->second);
54
      }
492
      final_hosts.push_back(existing_host->second);
1050
    } else {
953
      new_hosts_for_current_priority.emplace(host_address_string);
953
      if (host->weight() > max_host_weight) {
11
        max_host_weight = host->weight();
11
      }
      // If we are depending on a health checker, we initialize to unhealthy.
      // If there's an existing host with the same health checker, the
      // active health-status is kept.
953
      if (health_checker_ != nullptr && !host->disableActiveHealthCheck()) {
99
        if (existing_host_found && !health_check_address_changed &&
99
            !active_health_check_flag_changed) {
          // If there's an existing host, use the same active health-status.
          // The existing host can be marked PENDING_ACTIVE_HC or
          // ACTIVE_HC_TIMEOUT if it is also marked with FAILED_ACTIVE_HC.
7
          ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) ||
7
                 existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
7
          ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT) ||
7
                 existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
7
          constexpr uint32_t active_hc_statuses_mask =
7
              enumToInt(Host::HealthFlag::FAILED_ACTIVE_HC) |
7
              enumToInt(Host::HealthFlag::DEGRADED_ACTIVE_HC) |
7
              enumToInt(Host::HealthFlag::PENDING_ACTIVE_HC) |
7
              enumToInt(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
7
          const uint32_t existing_host_statuses = existing_host->second->healthFlagsGetAll();
7
          host->healthFlagsSetAll(existing_host_statuses & active_hc_statuses_mask);
92
        } else {
          // No previous known host, mark it as failed active HC.
92
          host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
          // If we want to exclude hosts until they have been health checked, mark them with
          // a flag to indicate that they have not been health checked yet.
92
          if (info_->warmHosts()) {
4
            host->healthFlagSet(Host::HealthFlag::PENDING_ACTIVE_HC);
4
          }
92
        }
99
      }
953
      final_hosts.push_back(host);
953
      hosts_added_to_current_priority.push_back(host);
953
    }
1445
  }
  // Remove hosts from current_priority_hosts that were matched to an existing host in the
  // previous loop.
1109
  auto erase_from =
1109
      std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(),
1243
                     [&existing_hosts_for_current_priority](const HostSharedPtr& p) {
667
                       auto existing_itr =
667
                           existing_hosts_for_current_priority.find(p->address()->asString());
667
                       if (existing_itr != existing_hosts_for_current_priority.end()) {
438
                         existing_hosts_for_current_priority.erase(existing_itr);
438
                         return true;
438
                       }
229
                       return false;
667
                     });
1109
  current_priority_hosts.erase(erase_from, current_priority_hosts.end());
  // If we saw existing hosts during this iteration from a different priority, then we've moved
  // a host from another priority into this one, so we should mark the priority as having changed.
1109
  if (!existing_hosts_for_current_priority.empty()) {
32
    hosts_changed = true;
32
  }
  // The remaining hosts are hosts that are not referenced in the config update. We remove them
  // from the priority if any of the following is true:
  // - Active health checking is not enabled.
  // - The removed hosts are failing active health checking OR have been explicitly marked as
  //   unhealthy by a previous EDS update. We do not count outlier as a reason to remove a host
  //   or any other future health condition that may be added so we do not use the coarseHealth()
  //   API.
  // - We have explicitly configured the cluster to remove hosts regardless of active health
  // status.
1109
  const bool dont_remove_healthy_hosts =
1109
      health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval();
1109
  if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) {
42
    erase_from = std::remove_if(
42
        current_priority_hosts.begin(), current_priority_hosts.end(),
42
        [&all_new_hosts, &new_hosts_for_current_priority,
42
         &hosts_with_updated_locality_for_current_priority,
42
         &hosts_with_active_health_check_flag_changed, &final_hosts,
60
         &max_host_weight](const HostSharedPtr& p) {
60
          const auto address_string = addressToString(p->address());
          // This host has already been added as a new host in the
          // new_hosts_for_current_priority. Return false here to make sure that host
          // reference with older locality gets cleaned up from the priority.
60
          if (hosts_with_updated_locality_for_current_priority.contains(address_string)) {
6
            return false;
6
          }
54
          if (hosts_with_active_health_check_flag_changed.contains(address_string)) {
4
            return false;
4
          }
50
          if (all_new_hosts.contains(address_string) &&
50
              !new_hosts_for_current_priority.contains(address_string)) {
            // If the address is being completely deleted from this priority, but is
            // referenced from another priority, then we assume that the other
            // priority will perform an in-place update to re-use the existing Host.
            // We should therefore not mark it as PENDING_DYNAMIC_REMOVAL, but
            // instead remove it immediately from this priority.
            // Example: health check address changed and priority also changed
28
            return false;
28
          }
          // PENDING_DYNAMIC_REMOVAL doesn't apply for the host with disabled active
          // health check, the host is removed immediately from this priority.
22
          if ((!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
22
                 p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) &&
22
              !p->disableActiveHealthCheck()) {
12
            if (p->weight() > max_host_weight) {
              max_host_weight = p->weight();
            }
12
            final_hosts.push_back(p);
12
            p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
12
            return true;
12
          }
10
          return false;
22
        });
42
    current_priority_hosts.erase(erase_from, current_priority_hosts.end());
42
  }
  // At this point we've accounted for all the new hosts as well the hosts that previously
  // existed in this priority.
1109
  info_->endpointStats().max_host_weight_.set(max_host_weight);
  // Whatever remains in current_priority_hosts should be removed.
1109
  if (!hosts_added_to_current_priority.empty() || !current_priority_hosts.empty()) {
856
    hosts_removed_from_current_priority = std::move(current_priority_hosts);
856
    hosts_changed = true;
856
  }
  // During the update we populated final_hosts with all the hosts that should remain
  // in the current priority, so move them back into current_priority_hosts.
1109
  current_priority_hosts = std::move(final_hosts);
  // We return false here in the absence of EDS health status or metadata changes, because we
  // have no changes to host vector status (modulo weights). When we have EDS
  // health status or metadata changed, we return true, causing updateHosts() to fire in the
  // caller.
1109
  return hosts_changed;
1109
}
Network::DnsLookupFamily
31
getDnsLookupFamilyFromCluster(const envoy::config::cluster::v3::Cluster& cluster) {
31
  return DnsUtils::getDnsLookupFamilyFromEnum(cluster.dns_lookup_family());
31
}
void reportUpstreamCxDestroy(const Upstream::HostDescriptionConstSharedPtr& host,
31239
                             Network::ConnectionEvent event) {
31239
  Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats();
31239
  stats.upstream_cx_destroy_.inc();
31239
  if (event == Network::ConnectionEvent::RemoteClose) {
11133
    stats.upstream_cx_destroy_remote_.inc();
29077
  } else {
20106
    stats.upstream_cx_destroy_local_.inc();
20106
  }
31239
}
void reportUpstreamCxDestroyActiveRequest(const Upstream::HostDescriptionConstSharedPtr& host,
4528
                                          Network::ConnectionEvent event) {
4528
  Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats();
4528
  stats.upstream_cx_destroy_with_active_rq_.inc();
4528
  if (event == Network::ConnectionEvent::RemoteClose) {
1946
    stats.upstream_cx_destroy_remote_with_active_rq_.inc();
4198
  } else {
2582
    stats.upstream_cx_destroy_local_with_active_rq_.inc();
2582
  }
4528
}
Network::Address::InstanceConstSharedPtr resolveHealthCheckAddress(
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
48786
    Network::Address::InstanceConstSharedPtr host_address) {
48786
  Network::Address::InstanceConstSharedPtr health_check_address;
48786
  const auto& port_value = health_check_config.port_value();
48786
  if (health_check_config.has_address()) {
977
    auto address_or_error = Network::Address::resolveProtoAddress(health_check_config.address());
977
    THROW_IF_NOT_OK_REF(address_or_error.status());
976
    auto address = address_or_error.value();
976
    health_check_address =
976
        port_value == 0 ? address : Network::Utility::getAddressWithPort(*address, port_value);
48161
  } else {
47809
    health_check_address = (port_value == 0 || !host_address->ip())
47809
                               ? host_address
47809
                               : Network::Utility::getAddressWithPort(*host_address, port_value);
47809
  }
48785
  return health_check_address;
48786
}
} // namespace Upstream
} // namespace Envoy