/proc/self/cwd/source/extensions/clusters/original_dst/original_dst_cluster.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <cstdint> |
4 | | #include <functional> |
5 | | #include <string> |
6 | | |
7 | | #include "envoy/config/cluster/v3/cluster.pb.h" |
8 | | #include "envoy/secret/secret_manager.h" |
9 | | #include "envoy/server/transport_socket_config.h" |
10 | | #include "envoy/stats/scope.h" |
11 | | #include "envoy/thread_local/thread_local.h" |
12 | | |
13 | | #include "source/common/common/empty_string.h" |
14 | | #include "source/common/common/logger.h" |
15 | | #include "source/common/config/metadata.h" |
16 | | #include "source/common/upstream/cluster_factory_impl.h" |
17 | | #include "source/common/upstream/upstream_impl.h" |
18 | | |
19 | | namespace Envoy { |
20 | | namespace Upstream { |
21 | | |
22 | | class OriginalDstClusterFactory; |
23 | | class OriginalDstClusterTest; |
24 | | |
25 | | struct HostsForAddress { |
26 | 0 | HostsForAddress(HostSharedPtr& host) : host_(host), used_(true) {} |
27 | | |
28 | | // Primary host for the address. This is set by the first worker that posts |
29 | | // to the main to add a host. The field is read by all workers. |
30 | | const HostSharedPtr host_; |
31 | | // Hosts that are added concurrently with host_ are stored in this list. |
32 | | // This is populated by the subsequent workers that have not received the |
33 | | // updated table with set host_. The field is only accessed from the main |
34 | | // thread. |
35 | | std::vector<HostSharedPtr> hosts_; |
36 | | // Marks as recently used by load balancers. |
37 | | std::atomic<bool> used_; |
38 | | }; |
39 | | |
40 | | using HostsForAddressSharedPtr = std::shared_ptr<HostsForAddress>; |
41 | | using HostMultiMap = absl::flat_hash_map<std::string, HostsForAddressSharedPtr>; |
42 | | using HostMultiMapSharedPtr = std::shared_ptr<HostMultiMap>; |
43 | | using HostMultiMapConstSharedPtr = std::shared_ptr<const HostMultiMap>; |
44 | | |
45 | | class OriginalDstCluster; |
46 | | |
47 | | // Handle object whose sole purpose is to ensure that the destructor of the inner OriginalDstCluster |
48 | | // is called on the main thread. |
49 | | class OriginalDstClusterHandle { |
50 | | public: |
51 | | OriginalDstClusterHandle(std::shared_ptr<OriginalDstCluster> cluster) |
52 | 0 | : cluster_(std::move(cluster)) {} |
53 | | ~OriginalDstClusterHandle(); |
54 | | |
55 | | private: |
56 | | std::shared_ptr<OriginalDstCluster> cluster_; |
57 | | friend class OriginalDstCluster; |
58 | | }; |
59 | | |
60 | | using OriginalDstClusterHandleSharedPtr = std::shared_ptr<OriginalDstClusterHandle>; |
61 | | |
62 | | /** |
63 | | * The OriginalDstCluster is a dynamic cluster that automatically adds hosts as needed based on the |
64 | | * original destination address of the downstream connection. These hosts are also automatically |
65 | | * cleaned up after they have not seen traffic for a configurable cleanup interval time |
66 | | * ("cleanup_interval_ms"). |
67 | | */ |
68 | | class OriginalDstCluster : public ClusterImplBase { |
69 | | public: |
70 | 0 | ~OriginalDstCluster() override { |
71 | 0 | ASSERT_IS_MAIN_OR_TEST_THREAD(); |
72 | 0 | cleanup_timer_->disableTimer(); |
73 | 0 | } |
74 | | |
75 | | // Upstream::Cluster |
76 | 0 | InitializePhase initializePhase() const override { return InitializePhase::Primary; } |
77 | | |
78 | | /** |
79 | | * Special Load Balancer for Original Dst Cluster. |
80 | | * |
81 | | * Load balancer gets called with the downstream context which can be used to make sure the |
82 | | * Original Dst cluster has a Host for the original destination. Normally load balancers can't |
83 | | * modify clusters, but in this case we access a singleton OriginalDstCluster that we can ask to |
84 | | * add hosts on demand. Additions are synced with all other threads so that the host set in the |
85 | | * cluster remains (eventually) consistent. If multiple threads add a host to the same upstream |
86 | | * address then two distinct HostSharedPtr's (with the same upstream IP address) will be added, |
87 | | * and both of them will eventually time out. |
88 | | */ |
89 | | class LoadBalancer : public Upstream::LoadBalancer { |
90 | | public: |
91 | | LoadBalancer(const OriginalDstClusterHandleSharedPtr& parent) |
92 | | : parent_(parent), http_header_name_(parent->cluster_->httpHeaderName()), |
93 | | metadata_key_(parent->cluster_->metadataKey()), |
94 | | port_override_(parent->cluster_->portOverride()), |
95 | 0 | host_map_(parent->cluster_->getCurrentHostMap()) {} |
96 | | |
97 | | // Upstream::LoadBalancer |
98 | | HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; |
99 | | // Preconnecting is not implemented for OriginalDstCluster |
100 | 0 | HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } |
101 | | // Pool selection not implemented for OriginalDstCluster |
102 | | absl::optional<Upstream::SelectedPoolAndConnection> |
103 | | selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, |
104 | | const Upstream::Host& /*host*/, |
105 | 0 | std::vector<uint8_t>& /*hash_key*/) override { |
106 | 0 | return absl::nullopt; |
107 | 0 | } |
108 | | // Lifetime tracking not implemented for OriginalDstCluster |
109 | 0 | OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override { |
110 | 0 | return {}; |
111 | 0 | } |
112 | | |
113 | | Network::Address::InstanceConstSharedPtr filterStateOverrideHost(LoadBalancerContext* context); |
114 | | Network::Address::InstanceConstSharedPtr requestOverrideHost(LoadBalancerContext* context); |
115 | | Network::Address::InstanceConstSharedPtr metadataOverrideHost(LoadBalancerContext* context); |
116 | | |
117 | | private: |
118 | | const OriginalDstClusterHandleSharedPtr parent_; |
119 | | // The optional original host provider that extracts the address from HTTP header map. |
120 | | const absl::optional<Http::LowerCaseString>& http_header_name_; |
121 | | const absl::optional<Config::MetadataKey>& metadata_key_; |
122 | | const absl::optional<uint32_t> port_override_; |
123 | | HostMultiMapConstSharedPtr host_map_; |
124 | | }; |
125 | | |
126 | 0 | const absl::optional<Http::LowerCaseString>& httpHeaderName() { return http_header_name_; } |
127 | 0 | const absl::optional<Config::MetadataKey>& metadataKey() { return metadata_key_; } |
128 | 0 | const absl::optional<uint32_t> portOverride() { return port_override_; } |
129 | | |
130 | | private: |
131 | | friend class OriginalDstClusterFactory; |
132 | | friend class OriginalDstClusterTest; |
133 | | OriginalDstCluster(const envoy::config::cluster::v3::Cluster& config, |
134 | | ClusterFactoryContext& context); |
135 | | |
136 | | struct LoadBalancerFactory : public Upstream::LoadBalancerFactory { |
137 | 0 | LoadBalancerFactory(const OriginalDstClusterHandleSharedPtr& cluster) : cluster_(cluster) {} |
138 | | |
139 | | // Upstream::LoadBalancerFactory |
140 | 0 | Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override { |
141 | 0 | return std::make_unique<LoadBalancer>(cluster_); |
142 | 0 | } |
143 | | |
144 | | const OriginalDstClusterHandleSharedPtr cluster_; |
145 | | }; |
146 | | |
147 | | struct ThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer { |
148 | 0 | ThreadAwareLoadBalancer(const OriginalDstClusterHandleSharedPtr& cluster) : cluster_(cluster) {} |
149 | | |
150 | | // Upstream::ThreadAwareLoadBalancer |
151 | 0 | Upstream::LoadBalancerFactorySharedPtr factory() override { |
152 | 0 | return std::make_shared<LoadBalancerFactory>(cluster_); |
153 | 0 | } |
154 | 0 | void initialize() override {} |
155 | | |
156 | | const OriginalDstClusterHandleSharedPtr cluster_; |
157 | | }; |
158 | | |
159 | 0 | HostMultiMapConstSharedPtr getCurrentHostMap() { |
160 | 0 | absl::ReaderMutexLock lock(&host_map_lock_); |
161 | 0 | return host_map_; |
162 | 0 | } |
163 | | |
164 | 0 | void setHostMap(const HostMultiMapConstSharedPtr& new_host_map) { |
165 | 0 | absl::WriterMutexLock lock(&host_map_lock_); |
166 | 0 | host_map_ = new_host_map; |
167 | 0 | } |
168 | | |
169 | | void addHost(HostSharedPtr&); |
170 | | void cleanup(); |
171 | | |
172 | | // ClusterImplBase |
173 | 0 | void startPreInit() override { onPreInitComplete(); } |
174 | | |
175 | | Event::Dispatcher& dispatcher_; |
176 | | const std::chrono::milliseconds cleanup_interval_ms_; |
177 | | Event::TimerPtr cleanup_timer_; |
178 | | |
179 | | absl::Mutex host_map_lock_; |
180 | | HostMultiMapConstSharedPtr host_map_ ABSL_GUARDED_BY(host_map_lock_); |
181 | | absl::optional<Http::LowerCaseString> http_header_name_; |
182 | | absl::optional<Config::MetadataKey> metadata_key_; |
183 | | absl::optional<uint32_t> port_override_; |
184 | | friend class OriginalDstClusterFactory; |
185 | | friend class OriginalDstClusterHandle; |
186 | | }; |
187 | | |
188 | | constexpr absl::string_view OriginalDstClusterFilterStateKey = |
189 | | "envoy.network.transport_socket.original_dst_address"; |
190 | | |
191 | | class OriginalDstClusterFactory : public ClusterFactoryImplBase { |
192 | | public: |
193 | 4 | OriginalDstClusterFactory() : ClusterFactoryImplBase("envoy.cluster.original_dst") {} |
194 | | |
195 | | private: |
196 | | friend class OriginalDstClusterTest; |
197 | | absl::StatusOr<std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>> |
198 | | createClusterImpl(const envoy::config::cluster::v3::Cluster& cluster, |
199 | | ClusterFactoryContext& context) override; |
200 | | }; |
201 | | |
202 | | DECLARE_FACTORY(OriginalDstClusterFactory); |
203 | | |
204 | | } // namespace Upstream |
205 | | } // namespace Envoy |