1
#include "source/extensions/clusters/original_dst/original_dst_cluster.h"
2

            
3
#include <chrono>
4
#include <list>
5
#include <string>
6
#include <vector>
7

            
8
#include "envoy/config/cluster/v3/cluster.pb.h"
9
#include "envoy/config/core/v3/base.pb.h"
10
#include "envoy/config/core/v3/health_check.pb.h"
11
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
12
#include "envoy/stats/scope.h"
13

            
14
#include "source/common/http/headers.h"
15
#include "source/common/network/address_impl.h"
16
#include "source/common/network/filter_state_dst_address.h"
17
#include "source/common/network/utility.h"
18
#include "source/common/protobuf/protobuf.h"
19
#include "source/common/protobuf/utility.h"
20
#include "source/common/runtime/runtime_features.h"
21

            
22
namespace Envoy {
23
namespace Upstream {
24

            
25
54
OriginalDstClusterHandle::~OriginalDstClusterHandle() {
26
54
  std::shared_ptr<OriginalDstCluster> cluster = std::move(cluster_);
27
54
  cluster_.reset();
28
54
  Event::Dispatcher& dispatcher = cluster->dispatcher_;
29
54
  dispatcher.post([cluster = std::move(cluster)]() mutable { cluster.reset(); });
30
54
}
31

            
32
83
HostSelectionResponse OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerContext* context) {
33
83
  if (context) {
34
    // Check if filter state override is present, if yes use it before anything else.
35
80
    Network::Address::InstanceConstSharedPtr dst_host = filterStateOverrideHost(context);
36

            
37
    // Check if override host metadata is present, if yes use it otherwise check the header.
38
80
    if (dst_host == nullptr) {
39
78
      dst_host = metadataOverrideHost(context);
40
78
    }
41

            
42
    // Check if override host header is present, if yes use it otherwise check local address.
43
80
    if (dst_host == nullptr) {
44
75
      dst_host = requestOverrideHost(context);
45
75
    }
46

            
47
80
    if (dst_host == nullptr) {
48
47
      const Network::Connection* connection = context->downstreamConnection();
49
      // The local address of the downstream connection is the original destination address,
50
      // if localAddressRestored() returns 'true'.
51
47
      if (connection && connection->connectionInfoProvider().localAddressRestored()) {
52
40
        dst_host = connection->connectionInfoProvider().localAddress();
53
40
      }
54
47
    }
55
80
    if (dst_host && port_override_.has_value()) {
56
22
      dst_host = Network::Utility::getAddressWithPort(*dst_host.get(), port_override_.value());
57
22
    }
58

            
59
80
    if (dst_host) {
60
73
      const Network::Address::Instance& dst_addr = *dst_host.get();
61
      // Check if a host with the destination address is already in the host set.
62
73
      auto it = host_map_->find(dst_addr.asString());
63
73
      if (it != host_map_->end()) {
64
22
        HostConstSharedPtr host = it->second->host_;
65
22
        ENVOY_LOG(trace, "Using existing host {} {}.", *host, host->address()->asString());
66
22
        it->second->used_ = true;
67
22
        return host;
68
22
      }
69
      // Add a new host
70
51
      const Network::Address::Ip* dst_ip = dst_addr.ip();
71
51
      if (dst_ip) {
72
50
        Network::Address::InstanceConstSharedPtr host_ip_port(
73
50
            Network::Utility::copyInternetAddressAndPort(*dst_ip));
74
        // Create a host we can use immediately.
75
50
        auto info = parent_->cluster_->info();
76
50
        HostSharedPtr host(std::shared_ptr<HostImpl>(THROW_OR_RETURN_VALUE(
77
50
            HostImpl::create(
78
50
                info, info->name() + dst_addr.asString(), std::move(host_ip_port), nullptr, nullptr,
79
50
                1, std::make_shared<envoy::config::core::v3::Locality>(),
80
50
                envoy::config::endpoint::v3::Endpoint::HealthCheckConfig().default_instance(), 0,
81
50
                envoy::config::core::v3::UNKNOWN),
82
50
            std::unique_ptr<HostImpl>)));
83
50
        ENVOY_LOG(debug, "Created host {} {}.", *host, host->address()->asString());
84

            
85
        // Tell the cluster about the new host
86
        // lambda cannot capture a member by value.
87
50
        std::weak_ptr<OriginalDstClusterHandle> post_parent = parent_;
88
50
        parent_->cluster_->dispatcher_.post([post_parent, host]() mutable {
89
          // The main cluster may have disappeared while this post was queued.
90
50
          if (std::shared_ptr<OriginalDstClusterHandle> parent = post_parent.lock()) {
91
50
            parent->cluster_->addHost(host);
92
50
          }
93
50
        });
94
50
        return {host};
95
50
      } else {
96
1
        ENVOY_LOG(debug, "Failed to create host for {}.", dst_addr.asString());
97
1
      }
98
51
    }
99
80
  }
100
  // TODO(ramaraochavali): add a stat and move this log line to debug.
101
11
  ENVOY_LOG(warn, "original_dst_load_balancer: No downstream connection or no original_dst.");
102
11
  return {nullptr};
103
83
}
104

            
105
Network::Address::InstanceConstSharedPtr
106
80
OriginalDstCluster::LoadBalancer::filterStateOverrideHost(LoadBalancerContext* context) {
107
80
  const auto streamInfos = {
108
80
      const_cast<const StreamInfo::StreamInfo*>(context->requestStreamInfo()),
109
80
      context->downstreamConnection() ? &context->downstreamConnection()->streamInfo() : nullptr};
110
160
  for (const auto streamInfo : streamInfos) {
111
160
    if (streamInfo == nullptr) {
112
38
      continue;
113
38
    }
114
122
    const auto* dst_address = streamInfo->filterState().getDataReadOnly<Network::AddressObject>(
115
122
        OriginalDstClusterFilterStateKey);
116
122
    if (dst_address) {
117
2
      return dst_address->address();
118
2
    }
119
122
  }
120
78
  return nullptr;
121
80
}
122

            
123
Network::Address::InstanceConstSharedPtr
124
75
OriginalDstCluster::LoadBalancer::requestOverrideHost(LoadBalancerContext* context) {
125
75
  if (!http_header_name_.has_value()) {
126
45
    return nullptr;
127
45
  }
128
30
  const Http::HeaderMap* downstream_headers = context->downstreamHeaders();
129
30
  if (!downstream_headers) {
130
    return nullptr;
131
  }
132
30
  Http::HeaderMap::GetResult override_header = downstream_headers->get(*http_header_name_);
133
30
  if (override_header.empty()) {
134
    return nullptr;
135
  }
136
  // This is an implicitly untrusted header, so per the API documentation only the first
137
  // value is used.
138
30
  const std::string request_override_host(override_header[0]->value().getStringView());
139
30
  Network::Address::InstanceConstSharedPtr request_host =
140
30
      Network::Utility::parseInternetAddressAndPortNoThrow(request_override_host, false);
141
30
  if (request_host == nullptr) {
142
2
    ENVOY_LOG(debug, "original_dst_load_balancer: invalid override header value. {}",
143
2
              request_override_host);
144
2
    parent_->cluster_->info()->trafficStats()->original_dst_host_invalid_.inc();
145
2
    return nullptr;
146
2
  }
147
28
  ENVOY_LOG(debug, "Using request override host {}.", request_override_host);
148
28
  return request_host;
149
30
}
150

            
151
Network::Address::InstanceConstSharedPtr
152
78
OriginalDstCluster::LoadBalancer::metadataOverrideHost(LoadBalancerContext* context) {
153
78
  if (!metadata_key_.has_value()) {
154
74
    return nullptr;
155
74
  }
156
4
  const auto streamInfos = {
157
4
      const_cast<const StreamInfo::StreamInfo*>(context->requestStreamInfo()),
158
4
      context->downstreamConnection() ? &context->downstreamConnection()->streamInfo() : nullptr};
159
4
  const Protobuf::Value* value = nullptr;
160
7
  for (const auto streamInfo : streamInfos) {
161
7
    if (streamInfo == nullptr) {
162
3
      continue;
163
3
    }
164
4
    const auto& metadata = streamInfo->dynamicMetadata();
165
4
    value = &Config::Metadata::metadataValue(&metadata, metadata_key_.value());
166
    // Path can refer to a list, in which case we extract the first element.
167
4
    if (value->kind_case() == Protobuf::Value::kListValue) {
168
1
      const auto& values = value->list_value().values();
169
1
      if (!values.empty()) {
170
1
        value = &(values[0]);
171
1
      }
172
1
    }
173
4
    if (value->kind_case() == Protobuf::Value::kStringValue) {
174
4
      break;
175
4
    }
176
4
  }
177
4
  if (value == nullptr || value->kind_case() != Protobuf::Value::kStringValue) {
178
    return nullptr;
179
  }
180
4
  const std::string& metadata_override_host = value->string_value();
181
4
  Network::Address::InstanceConstSharedPtr metadata_host =
182
4
      Network::Utility::parseInternetAddressAndPortNoThrow(metadata_override_host, false);
183
4
  if (metadata_host == nullptr) {
184
1
    ENVOY_LOG(debug, "original_dst_load_balancer: invalid override metadata value. {}",
185
1
              metadata_override_host);
186
1
    parent_->cluster_->info()->trafficStats()->original_dst_host_invalid_.inc();
187
1
    return nullptr;
188
1
  }
189
3
  ENVOY_LOG(debug, "Using metadata override host {}.", metadata_override_host);
190
3
  return metadata_host;
191
4
}
192

            
193
OriginalDstCluster::OriginalDstCluster(const envoy::config::cluster::v3::Cluster& config,
194
                                       ClusterFactoryContext& context,
195
                                       absl::Status& creation_status)
196
35
    : ClusterImplBase(config, context, creation_status),
197
35
      dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
198
      cleanup_interval_ms_(
199
35
          std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, cleanup_interval, 5000))),
200
35
      cleanup_timer_(dispatcher_.createTimer([this]() -> void { cleanup(); })),
201
35
      host_map_(std::make_shared<HostMultiMap>()) {
202
35
  if (config.has_original_dst_lb_config()) {
203
20
    const auto& lb_config = config.original_dst_lb_config();
204
20
    if (lb_config.use_http_header()) {
205
13
      http_header_name_ = lb_config.http_header_name().empty()
206
13
                              ? Http::Headers::get().EnvoyOriginalDstHost
207
13
                              : Http::LowerCaseString(lb_config.http_header_name());
208
13
    }
209
20
    if (lb_config.has_metadata_key()) {
210
4
      metadata_key_ = Config::MetadataKey(lb_config.metadata_key());
211
4
    }
212
20
    if (lb_config.has_upstream_port_override()) {
213
4
      port_override_ = lb_config.upstream_port_override().value();
214
4
    }
215
20
  }
216
35
  cleanup_timer_->enableTimer(cleanup_interval_ms_);
217
35
}
218

            
219
50
void OriginalDstCluster::addHost(HostSharedPtr& host) {
220
50
  std::string address = host->address()->asString();
221
50
  HostMultiMapSharedPtr new_host_map = std::make_shared<HostMultiMap>(*getCurrentHostMap());
222
50
  auto it = new_host_map->find(address);
223
50
  if (it != new_host_map->end()) {
224
    // If the entry already exists, that means the worker that posted this host
225
    // had a stale host map. Because the host is potentially in that worker's
226
    // connection pools, we save the host in the host map hosts_ list and the
227
    // cluster priority set. Subsequently, the entire hosts_ list and the
228
    // primary host are removed collectively, once no longer in use.
229
2
    it->second->hosts_.push_back(host);
230
48
  } else {
231
    // The first worker that creates a host for the address defines the primary
232
    // host structure.
233
48
    new_host_map->emplace(address, std::make_shared<HostsForAddress>(host));
234
48
  }
235
50
  ENVOY_LOG(debug, "addHost() adding {} {}.", *host, address);
236
50
  setHostMap(new_host_map);
237

            
238
  // Given the current config, only EDS clusters support multiple priorities.
239
50
  ASSERT(priority_set_.hostSetsPerPriority().size() == 1);
240
50
  const auto& first_host_set = priority_set_.getOrCreateHostSet(0);
241
50
  HostVectorSharedPtr all_hosts(new HostVector(first_host_set.hosts()));
242
50
  all_hosts->emplace_back(host);
243
50
  priority_set_.updateHosts(0,
244
50
                            HostSetImpl::partitionHosts(all_hosts, HostsPerLocalityImpl::empty()),
245
50
                            {}, {std::move(host)}, {}, absl::nullopt, absl::nullopt);
246
50
}
247

            
248
16
void OriginalDstCluster::cleanup() {
249
16
  HostVectorSharedPtr keeping_hosts(new HostVector);
250
16
  HostVector to_be_removed;
251
16
  absl::flat_hash_set<absl::string_view> removed_addresses;
252
16
  auto host_map = getCurrentHostMap();
253
16
  if (!host_map->empty()) {
254
16
    ENVOY_LOG(trace, "Cleaning up stale original dst hosts.");
255
18
    for (const auto& [addr, hosts] : *host_map) {
256
      // Address is kept in the cluster if either of the two things happen:
257
      // 1) a host has been recently selected for the address; 2) none of the
258
      // hosts are currently in any of the connection pools.
259
      // The set of hosts for a single address are treated as a unit.
260
      //
261
      // Using the used_ bit is preserved for backwards compatibility and to
262
      // add a delay between load balancers choosing a host and grabbing a
263
      // handle on the host. This prevents the following interleaving:
264
      //
265
      // 1) worker 1: pools release host h
266
      // 2) worker 1: auto h = lb.chooseHost(&ctx);
267
      // 3) main: cleanup() // deletes h because h is not used by the pools
268
      // 4) worker 1: auto handle = h.acquireHandle();
269
      //
270
      // Because the duration between steps 2) and 4) is O(instructions), step
271
      // 3) will not delete h since it takes at least one cleanup_interval for
272
      // the host to set used_ bit for h to false.
273
18
      bool keep = false;
274
18
      if (hosts->used_) {
275
8
        keep = true;
276
8
        hosts->used_ = false; // Mark to be removed during the next round.
277
10
      } else if (Runtime::runtimeFeatureEnabled(
278
10
                     "envoy.reloadable_features.original_dst_rely_on_idle_timeout")) {
279
        // Check that all hosts (first, as well as others that may have been added concurrently)
280
        // are not in use by any connection pool.
281
10
        if (hosts->host_->used()) {
282
2
          keep = true;
283
8
        } else {
284
8
          for (const auto& host : hosts->hosts_) {
285
4
            if (host->used()) {
286
2
              keep = true;
287
2
              break;
288
2
            }
289
4
          }
290
8
        }
291
10
      }
292
18
      if (keep) {
293
12
        ENVOY_LOG(trace, "Keeping active address {}.", addr);
294
12
        keeping_hosts->emplace_back(hosts->host_);
295
12
        if (!hosts->hosts_.empty()) {
296
5
          keeping_hosts->insert(keeping_hosts->end(), hosts->hosts_.begin(), hosts->hosts_.end());
297
5
        }
298
12
      } else {
299
6
        ENVOY_LOG(trace, "Removing stale address {}.", addr);
300
6
        removed_addresses.insert(addr);
301
6
        to_be_removed.emplace_back(hosts->host_);
302
6
        if (!hosts->hosts_.empty()) {
303
2
          to_be_removed.insert(to_be_removed.end(), hosts->hosts_.begin(), hosts->hosts_.end());
304
2
        }
305
6
      }
306
18
    }
307
16
  }
308
16
  if (!to_be_removed.empty()) {
309
5
    HostMultiMapSharedPtr new_host_map = std::make_shared<HostMultiMap>(*host_map);
310
6
    for (const auto& addr : removed_addresses) {
311
6
      new_host_map->erase(addr);
312
6
    }
313
5
    setHostMap(new_host_map);
314
5
    priority_set_.updateHosts(
315
5
        0, HostSetImpl::partitionHosts(keeping_hosts, HostsPerLocalityImpl::empty()), {}, {},
316
5
        to_be_removed, false, absl::nullopt);
317
5
  }
318

            
319
16
  cleanup_timer_->enableTimer(cleanup_interval_ms_);
320
16
}
321

            
322
absl::StatusOr<std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>>
323
OriginalDstClusterFactory::createClusterImpl(const envoy::config::cluster::v3::Cluster& cluster,
324
38
                                             ClusterFactoryContext& context) {
325
38
  if (cluster.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
326
1
    return absl::InvalidArgumentError(
327
1
        fmt::format("cluster: LB policy {} is not valid for Cluster type {}. Only "
328
1
                    "'CLUSTER_PROVIDED' is allowed with cluster type 'ORIGINAL_DST'",
329
1
                    envoy::config::cluster::v3::Cluster::LbPolicy_Name(cluster.lb_policy()),
330
1
                    envoy::config::cluster::v3::Cluster::DiscoveryType_Name(cluster.type())));
331
1
  }
332

            
333
37
  if (cluster.has_load_assignment()) {
334
1
    return absl::InvalidArgumentError(
335
1
        "ORIGINAL_DST clusters must have no load assignment configured");
336
1
  }
337

            
338
36
  if (!cluster.original_dst_lb_config().use_http_header() &&
339
36
      !cluster.original_dst_lb_config().http_header_name().empty()) {
340
1
    return absl::InvalidArgumentError(fmt::format(
341
1
        "ORIGINAL_DST cluster: invalid config http_header_name={} and use_http_header is "
342
1
        "false. Set use_http_header to true if http_header_name is desired.",
343
1
        cluster.original_dst_lb_config().http_header_name()));
344
1
  }
345

            
346
  // TODO(mattklein123): The original DST load balancer type should be deprecated and instead
347
  //                     the cluster should directly supply the load balancer. This will remove
348
  //                     a special case and allow this cluster to be compiled out as an extension.
349
35
  absl::Status creation_status = absl::OkStatus();
350
35
  auto new_cluster = std::shared_ptr<OriginalDstCluster>(
351
35
      new OriginalDstCluster(cluster, context, creation_status));
352
35
  RETURN_IF_NOT_OK(creation_status);
353
35
  auto lb = std::make_unique<OriginalDstCluster::ThreadAwareLoadBalancer>(
354
35
      std::make_shared<OriginalDstClusterHandle>(new_cluster));
355
35
  return std::make_pair(new_cluster, std::move(lb));
356
35
}
357

            
358
/**
359
 * Static registration for the original dst cluster factory. @see RegisterFactory.
360
 */
361
REGISTER_FACTORY(OriginalDstClusterFactory, ClusterFactory);
362

            
363
class OriginalDstClusterFilterStateFactory : public Network::BaseAddressObjectFactory {
364
public:
365
14
  std::string name() const override { return std::string(OriginalDstClusterFilterStateKey); }
366
};
367

            
368
REGISTER_FACTORY(OriginalDstClusterFilterStateFactory, StreamInfo::FilterState::ObjectFactory);
369

            
370
} // namespace Upstream
371
} // namespace Envoy