1
#include "source/extensions/clusters/dynamic_forward_proxy/cluster.h"
2

            
3
#include <algorithm>
4

            
5
#include "envoy/config/cluster/v3/cluster.pb.h"
6
#include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.h"
7
#include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.validate.h"
8
#include "envoy/router/string_accessor.h"
9
#include "envoy/stream_info/uint32_accessor.h"
10

            
11
#include "source/common/http/utility.h"
12
#include "source/common/network/filter_state_proxy_info.h"
13
#include "source/common/network/transport_socket_options_impl.h"
14
#include "source/common/router/string_accessor_impl.h"
15
#include "source/common/stream_info/uint32_accessor_impl.h"
16
#include "source/common/tls/cert_validator/default_validator.h"
17
#include "source/common/tls/utility.h"
18
#include "source/extensions/common/dynamic_forward_proxy/dns_cache_manager_impl.h"
19

            
20
namespace Envoy {
21
namespace Extensions {
22
namespace Clusters {
23
namespace DynamicForwardProxy {
24

            
25
namespace {
26
constexpr absl::string_view DynamicHostFilterStateKey = "envoy.upstream.dynamic_host";
27
constexpr absl::string_view DynamicPortFilterStateKey = "envoy.upstream.dynamic_port";
28

            
29
class DynamicHostObjectFactory : public StreamInfo::FilterState::ObjectFactory {
30
public:
31
15
  std::string name() const override { return std::string(DynamicHostFilterStateKey); }
32
  std::unique_ptr<StreamInfo::FilterState::Object>
33
1
  createFromBytes(absl::string_view data) const override {
34
1
    return std::make_unique<Router::StringAccessorImpl>(data);
35
1
  }
36
};
37
class DynamicPortObjectFactory : public StreamInfo::FilterState::ObjectFactory {
38
public:
39
15
  std::string name() const override { return std::string(DynamicPortFilterStateKey); }
40
  std::unique_ptr<StreamInfo::FilterState::Object>
41
2
  createFromBytes(absl::string_view data) const override {
42
2
    uint32_t port = 0;
43
2
    if (absl::SimpleAtoi(data, &port)) {
44
1
      return std::make_unique<StreamInfo::UInt32AccessorImpl>(port);
45
1
    }
46
1
    return nullptr;
47
2
  }
48
};
49

            
50
75
bool isProxying(StreamInfo::StreamInfo* stream_info) {
51
  // Should not hit this call unless the flag is enabled.
52
75
  ASSERT(Runtime::runtimeFeatureEnabled(
53
75
      "envoy.reloadable_features.skip_dns_lookup_for_proxied_requests"));
54
75
  return stream_info && stream_info->filterState() &&
55
75
         stream_info->filterState()->hasData<Network::Http11ProxyInfoFilterState>(
56
75
             Network::Http11ProxyInfoFilterState::key());
57
75
}
58

            
59
} // namespace
60

            
61
REGISTER_FACTORY(DynamicHostObjectFactory, StreamInfo::FilterState::ObjectFactory);
62
REGISTER_FACTORY(DynamicPortObjectFactory, StreamInfo::FilterState::ObjectFactory);
63

            
64
Cluster::Cluster(
65
    const envoy::config::cluster::v3::Cluster& cluster,
66
    Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr&& cache,
67
    const envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig& config,
68
    Upstream::ClusterFactoryContext& context,
69
    Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr&& cache_manager,
70
    absl::Status& creation_status)
71
129
    : Upstream::BaseDynamicClusterImpl(cluster, context, creation_status),
72
129
      dns_cache_manager_(std::move(cache_manager)), dns_cache_(std::move(cache)),
73
129
      update_callbacks_handle_(dns_cache_->addUpdateCallbacks(*this)),
74
129
      local_info_(context.serverFactoryContext().localInfo()),
75
129
      main_thread_dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
76
129
      orig_cluster_config_(cluster),
77
129
      allow_coalesced_connections_(config.allow_coalesced_connections()),
78
129
      time_source_(context.serverFactoryContext().timeSource()),
79
129
      cm_(context.serverFactoryContext().clusterManager()),
80
      max_sub_clusters_(
81
129
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.sub_clusters_config(), max_sub_clusters, 1024)),
82
129
      sub_cluster_ttl_(
83
129
          PROTOBUF_GET_MS_OR_DEFAULT(config.sub_clusters_config(), sub_cluster_ttl, 300000)),
84
129
      sub_cluster_lb_policy_(config.sub_clusters_config().lb_policy()),
85
129
      enable_sub_cluster_(config.has_sub_clusters_config()) {
86

            
87
129
  if (enable_sub_cluster_) {
88
16
    idle_timer_ = main_thread_dispatcher_.createTimer([this]() { checkIdleSubCluster(); });
89
16
    idle_timer_->enableTimer(sub_cluster_ttl_);
90
16
  }
91
129
}
92

            
93
129
Cluster::~Cluster() {
94
129
  if (enable_sub_cluster_) {
95
16
    idle_timer_->disableTimer();
96
16
    idle_timer_.reset();
97
16
  }
98
129
  if (cm_.isShutdown()) {
99
102
    return;
100
102
  }
101
  // Should remove all sub clusters, otherwise, might be memory leaking.
102
  // This lock is useless, just make compiler happy.
103
27
  absl::WriterMutexLock lock{cluster_map_lock_};
104
30
  for (auto it = cluster_map_.cbegin(); it != cluster_map_.cend();) {
105
3
    auto cluster_name = it->first;
106
3
    ENVOY_LOG(debug, "cluster='{}' removing from cluster_map & cluster manager", cluster_name);
107
3
    cluster_map_.erase(it++);
108
3
    cm_.removeCluster(cluster_name, true);
109
3
  }
110
27
}
111

            
112
126
void Cluster::startPreInit() {
113
  // If we are attaching to a pre-populated cache we need to initialize our hosts.
114
126
  std::unique_ptr<Upstream::HostVector> hosts_added;
115
126
  dns_cache_->iterateHostMap(
116
126
      [&](absl::string_view host, const Common::DynamicForwardProxy::DnsHostInfoSharedPtr& info) {
117
18
        absl::Status status = addOrUpdateHost(host, info, hosts_added);
118
18
        if (!status.ok()) {
119
          ENVOY_LOG(warn, "Failed to add host from cache: {}", status.message());
120
        }
121
18
      });
122
126
  if (hosts_added) {
123
17
    updatePriorityState(*hosts_added, {});
124
17
  }
125
126
  onPreInitComplete();
126
126
}
127

            
128
4
bool Cluster::touch(const std::string& cluster_name) {
129
4
  absl::ReaderMutexLock lock{cluster_map_lock_};
130
4
  const auto cluster_it = cluster_map_.find(cluster_name);
131
4
  if (cluster_it != cluster_map_.end()) {
132
4
    cluster_it->second->touch();
133
4
    return true;
134
4
  }
135
  ENVOY_LOG(debug, "cluster='{}' has been removed while touching", cluster_name);
136
  return false;
137
4
}
138

            
139
2
void Cluster::checkIdleSubCluster() {
140
2
  ASSERT(main_thread_dispatcher_.isThreadSafe());
141
2
  {
142
    // TODO: try read lock first.
143
2
    absl::WriterMutexLock lock{cluster_map_lock_};
144
4
    for (auto it = cluster_map_.cbegin(); it != cluster_map_.cend();) {
145
2
      if (it->second->checkIdle()) {
146
2
        auto cluster_name = it->first;
147
2
        ENVOY_LOG(debug, "cluster='{}' removing from cluster_map & cluster manager", cluster_name);
148
2
        cluster_map_.erase(it++);
149
2
        cm_.removeCluster(cluster_name, true);
150
2
      } else {
151
        ++it;
152
      }
153
2
    }
154
2
  }
155
2
  idle_timer_->enableTimer(sub_cluster_ttl_);
156
2
}
157

            
158
std::pair<bool, absl::optional<envoy::config::cluster::v3::Cluster>>
159
Cluster::createSubClusterConfig(const std::string& cluster_name, const std::string& host,
160
18
                                const int port) {
161
18
  {
162
18
    absl::WriterMutexLock lock{cluster_map_lock_};
163
18
    const auto cluster_it = cluster_map_.find(cluster_name);
164
18
    if (cluster_it != cluster_map_.end()) {
165
1
      cluster_it->second->touch();
166
1
      return std::make_pair(true, absl::nullopt);
167
1
    }
168
17
    if (cluster_map_.size() >= max_sub_clusters_) {
169
2
      ENVOY_LOG(debug, "cluster='{}' create failed due to max sub cluster limitation",
170
2
                cluster_name);
171
2
      return std::make_pair(false, absl::nullopt);
172
2
    }
173
15
    cluster_map_.emplace(cluster_name, std::make_shared<ClusterInfo>(cluster_name, *this));
174
15
  }
175

            
176
  // Inherit configuration from the parent DFP cluster.
177
  envoy::config::cluster::v3::Cluster config = orig_cluster_config_;
178

            
179
  // Overwrite the type.
180
15
  config.set_name(cluster_name);
181
15
  config.clear_cluster_type();
182
15
  config.set_lb_policy(sub_cluster_lb_policy_);
183
15
  config.set_type(
184
15
      envoy::config::cluster::v3::Cluster_DiscoveryType::Cluster_DiscoveryType_STRICT_DNS);
185

            
186
  // Set endpoint.
187
15
  auto load_assignments = config.mutable_load_assignment();
188
15
  load_assignments->set_cluster_name(cluster_name);
189
15
  load_assignments->clear_endpoints();
190

            
191
15
  auto socket_address = load_assignments->add_endpoints()
192
15
                            ->add_lb_endpoints()
193
15
                            ->mutable_endpoint()
194
15
                            ->mutable_address()
195
15
                            ->mutable_socket_address();
196
15
  socket_address->set_address(host);
197
15
  socket_address->set_port_value(port);
198

            
199
15
  return std::make_pair(true, absl::make_optional(config));
200
17
}
201

            
202
Upstream::HostSelectionResponse Cluster::chooseHost(absl::string_view host,
203
18
                                                    Upstream::LoadBalancerContext* context) const {
204
18
  uint16_t default_port = 80;
205
18
  if (info_->transportSocketMatcher()
206
18
          .resolve(nullptr, nullptr)
207
18
          .factory_.implementsSecureTransport()) {
208
18
    default_port = 443;
209
18
  }
210

            
211
18
  const auto host_attributes = Http::Utility::parseAuthority(host);
212
18
  auto dynamic_host = std::string(host_attributes.host_);
213
18
  auto port = host_attributes.port_.value_or(default_port);
214

            
215
  // cluster name is prefix + host + port
216
18
  auto cluster_name = "DFPCluster:" + dynamic_host + ":" + std::to_string(port);
217

            
218
  // try again to get the sub cluster.
219
18
  auto cluster = cm_.getThreadLocalCluster(cluster_name);
220
18
  if (cluster == nullptr) {
221
    ENVOY_LOG(debug, "cluster='{}' get thread local failed, too short ttl?", cluster_name);
222
    return {nullptr};
223
  }
224

            
225
18
  return cluster->loadBalancer().chooseHost(context);
226
18
}
227

            
228
Cluster::ClusterInfo::ClusterInfo(std::string cluster_name, Cluster& parent)
229
15
    : cluster_name_(cluster_name), parent_(parent) {
230
15
  ENVOY_LOG(debug, "cluster='{}' ClusterInfo created", cluster_name_);
231
15
  touch();
232
15
}
233

            
234
20
void Cluster::ClusterInfo::touch() {
235
20
  ENVOY_LOG(debug, "cluster='{}' updating last used time", cluster_name_);
236
20
  last_used_time_ = parent_.time_source_.monotonicTime().time_since_epoch();
237
20
}
238

            
239
// checkIdle run in the main thread.
240
2
bool Cluster::ClusterInfo::checkIdle() {
241
2
  ASSERT(parent_.main_thread_dispatcher_.isThreadSafe());
242

            
243
2
  const std::chrono::steady_clock::duration now_duration =
244
2
      parent_.main_thread_dispatcher_.timeSource().monotonicTime().time_since_epoch();
245
2
  auto last_used_time = last_used_time_.load();
246
2
  ENVOY_LOG(debug, "cluster='{}' TTL check: now={} last_used={} TTL {}", cluster_name_,
247
2
            now_duration.count(), last_used_time.count(), parent_.sub_cluster_ttl_.count());
248

            
249
2
  if ((now_duration - last_used_time) > parent_.sub_cluster_ttl_) {
250
2
    ENVOY_LOG(debug, "cluster='{}' TTL expired", cluster_name_);
251
2
    return true;
252
2
  }
253
  return false;
254
2
}
255

            
256
absl::Status Cluster::addOrUpdateHost(
257
    absl::string_view host,
258
    const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info,
259
79
    std::unique_ptr<Upstream::HostVector>& hosts_added) {
260
79
  Upstream::LogicalHostSharedPtr emplaced_host;
261
79
  {
262
79
    absl::WriterMutexLock lock{host_map_lock_};
263

            
264
    // NOTE: Right now we allow a DNS cache to be shared between multiple clusters. Though we have
265
    // connection/request circuit breakers on the cluster, we don't have any way to control the
266
    // maximum hosts on a cluster. We currently assume that host data shared via shared pointer is
267
    // a marginal memory cost above that already used by connections and requests, so relying on
268
    // connection/request circuit breakers is sufficient. We may have to revisit this in the
269
    // future.
270
79
    const auto host_map_it = host_map_.find(host);
271
79
    if (host_map_it != host_map_.end()) {
272
      // If we only have an address change, we can do that swap inline without any other updates.
273
      // The appropriate R/W locking is in place to allow this. The details of this locking are:
274
      //  - Hosts are not thread local, they are global.
275
      //  - We take a read lock when reading the address and a write lock when changing it.
276
      //  - Address updates are very rare.
277
      //  - Address reads are only done when a connection is being made and a "real" host
278
      //    description is created or the host is queried via the admin endpoint. Both of
279
      //    these operations are relatively rare and the read lock is held for a short period
280
      //    of time.
281
      //
282
      // TODO(mattklein123): Right now the dynamic forward proxy / DNS cache works similar to how
283
      //                     logical DNS works, meaning that we only store a single address per
284
      //                     resolution. It would not be difficult to also expose strict DNS
285
      //                     semantics, meaning the cache would expose multiple addresses and the
286
      //                     cluster would create multiple logical hosts based on those addresses.
287
      //                     We will leave this is a follow up depending on need.
288
3
      ASSERT(host_info == host_map_it->second.shared_host_info_);
289
3
      ENVOY_LOG(debug, "updating dfproxy cluster host address '{}'", host);
290
3
      host_map_it->second.logical_host_->setNewAddresses(
291
3
          host_info->address(), host_info->addressList(), dummy_lb_endpoint_);
292
3
      return absl::OkStatus();
293
3
    }
294

            
295
76
    ENVOY_LOG(debug, "adding new dfproxy cluster host '{}'", host);
296
76
    auto host_or_error = Upstream::LogicalHost::create(
297
76
        info(), std::string{host}, host_info->address(), host_info->addressList(),
298
76
        dummy_locality_lb_endpoint_, dummy_lb_endpoint_, nullptr);
299
76
    RETURN_IF_NOT_OK_REF(host_or_error.status());
300

            
301
76
    emplaced_host =
302
76
        host_map_
303
76
            .try_emplace(host, host_info,
304
76
                         std::shared_ptr<Upstream::LogicalHost>(host_or_error->release()))
305
76
            .first->second.logical_host_;
306
76
  }
307

            
308
76
  ASSERT(emplaced_host);
309
76
  if (hosts_added == nullptr) {
310
75
    hosts_added = std::make_unique<Upstream::HostVector>();
311
75
  }
312
76
  hosts_added->emplace_back(emplaced_host);
313
76
  return absl::OkStatus();
314
76
}
315

            
316
absl::Status Cluster::onDnsHostAddOrUpdate(
317
    const std::string& host,
318
61
    const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info) {
319
61
  ENVOY_LOG(debug, "Adding host info for {}", host);
320

            
321
61
  std::unique_ptr<Upstream::HostVector> hosts_added;
322
61
  RETURN_IF_NOT_OK(addOrUpdateHost(host, host_info, hosts_added));
323
61
  if (hosts_added != nullptr) {
324
58
    ASSERT(!hosts_added->empty());
325
58
    updatePriorityState(*hosts_added, {});
326
58
  }
327
61
  return absl::OkStatus();
328
61
}
329

            
330
void Cluster::updatePriorityState(const Upstream::HostVector& hosts_added,
331
86
                                  const Upstream::HostVector& hosts_removed) {
332
86
  Upstream::PriorityStateManager priority_state_manager(*this, local_info_, nullptr);
333
86
  priority_state_manager.initializePriorityFor(dummy_locality_lb_endpoint_);
334
86
  {
335
86
    absl::ReaderMutexLock lock{host_map_lock_};
336
87
    for (const auto& host : host_map_) {
337
79
      priority_state_manager.registerHostForPriority(host.second.logical_host_,
338
79
                                                     dummy_locality_lb_endpoint_);
339
79
    }
340
86
  }
341
86
  priority_state_manager.updateClusterPrioritySet(
342
86
      0, std::move(priority_state_manager.priorityState()[0].first), hosts_added, hosts_removed,
343
86
      absl::nullopt, absl::nullopt, absl::nullopt);
344
86
}
345

            
346
11
void Cluster::onDnsHostRemove(const std::string& host) {
347
11
  Upstream::HostVector hosts_removed;
348
11
  {
349
11
    absl::WriterMutexLock lock{host_map_lock_};
350
11
    const auto host_map_it = host_map_.find(host);
351
11
    ASSERT(host_map_it != host_map_.end());
352
11
    hosts_removed.emplace_back(host_map_it->second.logical_host_);
353
11
    host_map_.erase(host);
354
11
    ENVOY_LOG(debug, "removing dfproxy cluster host '{}'", host);
355
11
  }
356
11
  updatePriorityState({}, hosts_removed);
357
11
}
358

            
359
Upstream::HostSelectionResponse
360
155
Cluster::LoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
361
155
  if (!context) {
362
1
    return {nullptr};
363
1
  }
364

            
365
  // For host lookup, we need to make sure to match the host of any DNS cache
366
  // insert. Two code points currently do DNS cache insert: the http DFP filter,
367
  // which inserts for HTTP traffic, and sets port based on the cluster's
368
  // security level, and the SNI DFP network filter which sets port based on
369
  // stream metadata, or configuration (which is then added as stream metadata).
370
154
  const bool is_secure = cluster_.info()
371
154
                             ->transportSocketMatcher()
372
154
                             .resolve(nullptr, nullptr)
373
154
                             .factory_.implementsSecureTransport();
374
154
  const uint32_t default_port = is_secure ? 443 : 80;
375

            
376
154
  const auto* stream_info = context->requestStreamInfo();
377
154
  const Router::StringAccessor* dynamic_host_filter_state = nullptr;
378
154
  if (stream_info) {
379
154
    dynamic_host_filter_state = stream_info->filterState().getDataReadOnly<Router::StringAccessor>(
380
154
        DynamicHostFilterStateKey);
381
154
  }
382

            
383
154
  absl::string_view raw_host;
384
154
  uint32_t port = default_port;
385

            
386
154
  if (dynamic_host_filter_state) {
387
    // Use dynamic host from filter state if available.
388
7
    raw_host = dynamic_host_filter_state->asString();
389

            
390
    // Try to get port from filter state first.
391
7
    const StreamInfo::UInt32Accessor* dynamic_port_filter_state =
392
7
        stream_info->filterState().getDataReadOnly<StreamInfo::UInt32Accessor>(
393
7
            DynamicPortFilterStateKey);
394

            
395
7
    if (dynamic_port_filter_state != nullptr && dynamic_port_filter_state->value() > 0 &&
396
7
        dynamic_port_filter_state->value() <= 65535) {
397
      // Use dynamic port from filter state if available.
398
6
      port = dynamic_port_filter_state->value();
399
6
    }
400
    // If no dynamic port is in filter state, we just use the default_port.
401
153
  } else if (context->downstreamHeaders()) {
402
141
    raw_host = context->downstreamHeaders()->getHostValue();
403
    // When no filter state is used, we let ``normalizeHostForDfp()`` handle the port parsing.
404
146
  } else if (context->downstreamConnection()) {
405
6
    raw_host = context->downstreamConnection()->requestedServerName();
406
6
  }
407

            
408
  // We always check for dynamic port from filter state, even if the host is not from filter state.
409
  // This is to maintain the backward compatibility with the existing SNI filter behavior.
410
154
  if (stream_info && !dynamic_host_filter_state) {
411
147
    const StreamInfo::UInt32Accessor* dynamic_port_filter_state =
412
147
        stream_info->filterState().getDataReadOnly<StreamInfo::UInt32Accessor>(
413
147
            DynamicPortFilterStateKey);
414

            
415
147
    if (dynamic_port_filter_state != nullptr && dynamic_port_filter_state->value() > 0 &&
416
147
        dynamic_port_filter_state->value() <= 65535) {
417
5
      port = dynamic_port_filter_state->value();
418
5
    }
419
147
  }
420

            
421
154
  if (raw_host.empty()) {
422
7
    ENVOY_LOG(debug, "host empty");
423
7
    return {nullptr, "empty_host_header"};
424
7
  }
425

            
426
147
  std::string hostname =
427
147
      Common::DynamicForwardProxy::DnsHostInfo::normalizeHostForDfp(raw_host, port);
428

            
429
147
  if (cluster_.enableSubCluster()) {
430
18
    return cluster_.chooseHost(hostname, context);
431
18
  }
432
129
  Upstream::HostConstSharedPtr host = findHostByName(hostname);
433
129
  bool force_refresh =
434
129
      Runtime::runtimeFeatureEnabled("envoy.reloadable_features.reresolve_if_no_connections") &&
435
129
      Runtime::runtimeFeatureEnabled("envoy.reloadable_features.dfp_cluster_resolves_hosts") &&
436
129
      host && !host->used();
437
129
  if ((host && !force_refresh) ||
438
129
      !Runtime::runtimeFeatureEnabled("envoy.reloadable_features.dfp_cluster_resolves_hosts")) {
439
49
    return {host};
440
49
  }
441

            
442
  // If the host is not found, the DFP cluster can now do asynchronous lookup.
443
80
  Upstream::ResourceAutoIncDecPtr handle = cluster_.dns_cache_->canCreateDnsRequest();
444

            
445
  // Return an immediate failure if there's too many requests already.
446
80
  if (!handle) {
447
5
    return {nullptr, "dns_cache_pending_requests_overflow"};
448
5
  }
449

            
450
  // Attempt to load the host from cache. Generally this will result in async
451
  // resolution so create a DFPHostSelectionHandle to handle this.
452
75
  std::unique_ptr<DFPHostSelectionHandle> cancelable =
453
75
      std::make_unique<DFPHostSelectionHandle>(context, cluster_, hostname);
454
75
  bool is_proxying = isProxying(context->requestStreamInfo());
455
75
  auto result = cluster_.dns_cache_->loadDnsCacheEntryWithForceRefresh(raw_host, port, is_proxying,
456
75
                                                                       force_refresh, *cancelable);
457
75
  switch (result.status_) {
458
8
  case Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryStatus::InCache:
459
8
    return {nullptr, result.host_info_.has_value() ? result.host_info_.value()->details() : ""};
460
65
  case Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryStatus::Loading:
461
    // Here the DFP kicks off an async lookup. The DFPHostSelectionHandle will
462
    // call onLoadDnsCacheComplete and onAsyncHostSelection unless the
463
    // resolution is canceled by the stream.
464
65
    cancelable->setHandle(std::move(result.handle_));
465
65
    cancelable->setAutoDec(std::move(handle));
466
65
    return Upstream::HostSelectionResponse{nullptr, std::move(cancelable)};
467
2
  case Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryStatus::Overflow:
468
    // In the case of overflow, return immediate failure.
469
2
    ENVOY_LOG(debug, "host {} lookup failed due to overflow", hostname);
470
2
    break; // fall through
471
75
  }
472
2
  return {nullptr, "dns_cache_overflow"};
473
75
}
474

            
475
129
Upstream::HostConstSharedPtr Cluster::LoadBalancer::findHostByName(const std::string& host) const {
476
129
  return cluster_.findHostByName(host);
477
129
}
478

            
479
192
Upstream::HostConstSharedPtr Cluster::findHostByName(const std::string& host) const {
480
192
  {
481
192
    absl::ReaderMutexLock lock{host_map_lock_};
482
192
    const auto host_it = host_map_.find(host);
483
192
    if (host_it == host_map_.end()) {
484
95
      ENVOY_LOG(debug, "host {} not found", host);
485
95
      return nullptr;
486
104
    } else {
487
97
      if (host_it->second.logical_host_->coarseHealth() == Upstream::Host::Health::Unhealthy) {
488
1
        ENVOY_LOG(debug, "host {} is unhealthy", host);
489
1
        return nullptr;
490
1
      }
491
96
      host_it->second.shared_host_info_->touch();
492
96
      return host_it->second.logical_host_;
493
97
    }
494
192
  }
495
192
}
496

            
497
absl::optional<Upstream::SelectedPoolAndConnection>
498
Cluster::LoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
499
                                                const Upstream::Host& host,
500
10
                                                std::vector<uint8_t>& hash_key) {
501
10
  const std::string& hostname = host.hostname();
502
10
  if (hostname.empty()) {
503
1
    return absl::nullopt;
504
1
  }
505

            
506
9
  LookupKey key = {hash_key, *host.address()};
507
9
  auto it = connection_info_map_.find(key);
508
9
  if (it == connection_info_map_.end()) {
509
5
    return absl::nullopt;
510
5
  }
511

            
512
4
  for (auto& info : it->second) {
513
3
    Envoy::Ssl::ConnectionInfoConstSharedPtr ssl = info.connection_->ssl();
514
3
    ASSERT(ssl);
515
5
    for (const std::string& san : ssl->dnsSansPeerCertificate()) {
516
5
      if (Extensions::TransportSockets::Tls::Utility::dnsNameMatch(hostname, san)) {
517
2
        return Upstream::SelectedPoolAndConnection{*info.pool_, *info.connection_};
518
2
      }
519
5
    }
520
3
  }
521

            
522
2
  return absl::nullopt;
523
4
}
524

            
525
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
526
11
Cluster::LoadBalancer::lifetimeCallbacks() {
527
11
  if (!cluster_.allowCoalescedConnections()) {
528
1
    return {};
529
1
  }
530
10
  return makeOptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>(*this);
531
11
}
532

            
533
void Cluster::LoadBalancer::onConnectionOpen(Envoy::Http::ConnectionPool::Instance& pool,
534
                                             std::vector<uint8_t>& hash_key,
535
9
                                             const Network::Connection& connection) {
536
  // Only coalesce connections that are over TLS.
537
9
  if (!connection.ssl()) {
538
1
    return;
539
1
  }
540
8
  const std::string alpn = connection.nextProtocol();
541
8
  if (alpn != Http::Utility::AlpnNames::get().Http2 &&
542
8
      alpn != Http::Utility::AlpnNames::get().Http3) {
543
    // Only coalesce connections for HTTP/2 and HTTP/3.
544
1
    return;
545
1
  }
546
7
  const LookupKey key = {hash_key, *connection.connectionInfoProvider().remoteAddress()};
547
7
  ConnectionInfo info = {&pool, &connection};
548
7
  connection_info_map_[key].push_back(info);
549
7
}
550

            
551
void Cluster::LoadBalancer::onConnectionDraining(Envoy::Http::ConnectionPool::Instance& pool,
552
                                                 std::vector<uint8_t>& hash_key,
553
1
                                                 const Network::Connection& connection) {
554
1
  const LookupKey key = {hash_key, *connection.connectionInfoProvider().remoteAddress()};
555
1
  connection_info_map_[key].erase(
556
1
      std::remove_if(connection_info_map_[key].begin(), connection_info_map_[key].end(),
557
1
                     [&pool, &connection](const ConnectionInfo& info) {
558
1
                       return (info.pool_ == &pool && info.connection_ == &connection);
559
1
                     }),
560
1
      connection_info_map_[key].end());
561
1
}
562

            
563
absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
564
ClusterFactory::createClusterWithConfig(
565
    const envoy::config::cluster::v3::Cluster& cluster,
566
    const envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig& proto_config,
567
111
    Upstream::ClusterFactoryContext& context) {
568

            
569
111
  Extensions::Common::DynamicForwardProxy::DnsCacheManagerFactoryImpl cache_manager_factory(
570
111
      context.serverFactoryContext(), context.messageValidationVisitor());
571

            
572
111
  envoy::config::cluster::v3::Cluster cluster_config = cluster;
573
111
  if (!cluster_config.has_upstream_http_protocol_options()) {
574
    // This sets defaults which will only apply if using old style http config.
575
    // They will be a no-op if typed_extension_protocol_options are used for
576
    // http config.
577
109
    cluster_config.mutable_upstream_http_protocol_options()->set_auto_sni(true);
578
109
    cluster_config.mutable_upstream_http_protocol_options()->set_auto_san_validation(true);
579
109
  }
580

            
581
111
  Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr cache_manager =
582
111
      cache_manager_factory.get();
583
111
  auto dns_cache_or_error = cache_manager->getCache(proto_config.dns_cache_config());
584
111
  RETURN_IF_NOT_OK_REF(dns_cache_or_error.status());
585

            
586
111
  absl::Status creation_status = absl::OkStatus();
587
111
  auto new_cluster = std::shared_ptr<Cluster>(
588
111
      new Cluster(cluster_config, std::move(dns_cache_or_error.value()), proto_config, context,
589
111
                  std::move(cache_manager), creation_status));
590
111
  RETURN_IF_NOT_OK(creation_status);
591

            
592
111
  Extensions::Common::DynamicForwardProxy::DFPClusterStoreFactory cluster_store_factory(
593
111
      context.serverFactoryContext().singletonManager());
594
111
  cluster_store_factory.get()->save(new_cluster->info()->name(), new_cluster);
595

            
596
111
  const auto& options = new_cluster->info()->httpProtocolOptions().upstreamHttpProtocolOptions();
597

            
598
111
  if (!proto_config.allow_insecure_cluster_options()) {
599
110
    if (!options.has_value() ||
600
110
        (!options.value().auto_sni() || !options.value().auto_san_validation())) {
601
1
      return absl::InvalidArgumentError(
602
1
          "dynamic_forward_proxy cluster must have auto_sni and auto_san_validation true unless "
603
1
          "allow_insecure_cluster_options is set.");
604
1
    }
605
110
  }
606
110
  if (proto_config.has_sub_clusters_config() &&
607
110
      proto_config.sub_clusters_config().lb_policy() ==
608
15
          envoy::config::cluster::v3::Cluster_LbPolicy::Cluster_LbPolicy_CLUSTER_PROVIDED) {
609
1
    return absl::InvalidArgumentError(
610
1
        "unsupported lb_policy 'CLUSTER_PROVIDED' in sub_cluster_config");
611
1
  }
612

            
613
109
  auto lb = std::make_unique<Cluster::ThreadAwareLoadBalancer>(*new_cluster);
614
109
  return std::make_pair(new_cluster, std::move(lb));
615
110
}
616

            
617
REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory);
618

            
619
} // namespace DynamicForwardProxy
620
} // namespace Clusters
621
} // namespace Extensions
622
} // namespace Envoy