/proc/self/cwd/source/extensions/filters/network/redis_proxy/conn_pool_impl.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <list> |
6 | | #include <memory> |
7 | | #include <string> |
8 | | #include <vector> |
9 | | |
10 | | #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h" |
11 | | #include "envoy/stats/stats_macros.h" |
12 | | #include "envoy/thread_local/thread_local.h" |
13 | | #include "envoy/upstream/cluster_manager.h" |
14 | | |
15 | | #include "source/common/buffer/buffer_impl.h" |
16 | | #include "source/common/common/token_bucket_impl.h" |
17 | | #include "source/common/network/address_impl.h" |
18 | | #include "source/common/network/filter_impl.h" |
19 | | #include "source/common/protobuf/utility.h" |
20 | | #include "source/common/singleton/const_singleton.h" |
21 | | #include "source/common/upstream/load_balancer_impl.h" |
22 | | #include "source/common/upstream/upstream_impl.h" |
23 | | #include "source/extensions/clusters/redis/redis_cluster_lb.h" |
24 | | #include "source/extensions/common/dynamic_forward_proxy/dns_cache.h" |
25 | | #include "source/extensions/common/redis/cluster_refresh_manager.h" |
26 | | #include "source/extensions/filters/network/common/redis/client.h" |
27 | | #include "source/extensions/filters/network/common/redis/client_impl.h" |
28 | | #include "source/extensions/filters/network/common/redis/codec_impl.h" |
29 | | #include "source/extensions/filters/network/common/redis/utility.h" |
30 | | #include "source/extensions/filters/network/redis_proxy/conn_pool.h" |
31 | | |
32 | | #include "absl/container/node_hash_map.h" |
33 | | |
34 | | namespace Envoy { |
35 | | namespace Extensions { |
36 | | namespace NetworkFilters { |
37 | | namespace RedisProxy { |
38 | | namespace ConnPool { |
39 | | |
40 | | // TODO(mattklein123): Circuit breaking |
41 | | // TODO(rshriram): Fault injection |
42 | | |
43 | | #define REDIS_CLUSTER_STATS(COUNTER) \ |
44 | | COUNTER(upstream_cx_drained) \ |
45 | | COUNTER(max_upstream_unknown_connections_reached) \ |
46 | | COUNTER(connection_rate_limited) |
47 | | |
48 | | struct RedisClusterStats { |
49 | | REDIS_CLUSTER_STATS(GENERATE_COUNTER_STRUCT) |
50 | | }; |
51 | | |
52 | | class DoNothingPoolCallbacks : public PoolCallbacks { |
53 | | public: |
54 | 0 | void onResponse(Common::Redis::RespValuePtr&&) override{}; |
55 | 0 | void onFailure() override{}; |
56 | | }; |
57 | | |
58 | | class InstanceImpl : public Instance, public std::enable_shared_from_this<InstanceImpl> { |
59 | | public: |
60 | | InstanceImpl( |
61 | | const std::string& cluster_name, Upstream::ClusterManager& cm, |
62 | | Common::Redis::Client::ClientFactory& client_factory, ThreadLocal::SlotAllocator& tls, |
63 | | const envoy::extensions::filters::network::redis_proxy::v3::RedisProxy::ConnPoolSettings& |
64 | | config, |
65 | | Api::Api& api, Stats::ScopeSharedPtr&& stats_scope, |
66 | | const Common::Redis::RedisCommandStatsSharedPtr& redis_command_stats, |
67 | | Extensions::Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager, |
68 | | const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr& dns_cache); |
69 | | // RedisProxy::ConnPool::Instance |
70 | | Common::Redis::Client::PoolRequest* |
71 | | makeRequest(const std::string& key, RespVariant&& request, PoolCallbacks& callbacks, |
72 | | Common::Redis::Client::Transaction& transaction) override; |
73 | | /** |
74 | | * Makes a redis request based on IP address and TCP port of the upstream host (e.g., |
75 | | * moved/ask cluster redirection). This is now only kept mostly for testing. |
76 | | * @param host_address supplies the IP address and TCP port of the upstream host to receive |
77 | | * the request. |
78 | | * @param request supplies the Redis request to make. |
79 | | * @param callbacks supplies the request completion callbacks. |
80 | | * @return PoolRequest* a handle to the active request or nullptr if the request could not be |
81 | | * made for some reason. |
82 | | */ |
83 | | Common::Redis::Client::PoolRequest* |
84 | | makeRequestToHost(const std::string& host_address, const Common::Redis::RespValue& request, |
85 | | Common::Redis::Client::ClientCallbacks& callbacks); |
86 | | |
87 | | void init(); |
88 | | |
89 | | // Allow the unit test to have access to private members. |
90 | | friend class RedisConnPoolImplTest; |
91 | | |
92 | | private: |
93 | | struct ThreadLocalPool; |
94 | | |
95 | | struct ThreadLocalActiveClient : public Network::ConnectionCallbacks { |
96 | 0 | ThreadLocalActiveClient(ThreadLocalPool& parent) : parent_(parent) {} |
97 | | |
98 | | // Network::ConnectionCallbacks |
99 | | void onEvent(Network::ConnectionEvent event) override; |
100 | 0 | void onAboveWriteBufferHighWatermark() override {} |
101 | 0 | void onBelowWriteBufferLowWatermark() override {} |
102 | | |
103 | | ThreadLocalPool& parent_; |
104 | | Upstream::HostConstSharedPtr host_; |
105 | | Common::Redis::Client::ClientPtr redis_client_; |
106 | | }; |
107 | | |
108 | | using ThreadLocalActiveClientPtr = std::unique_ptr<ThreadLocalActiveClient>; |
109 | | |
110 | | struct PendingRequest |
111 | | : public Common::Redis::Client::ClientCallbacks, |
112 | | public Common::Redis::Client::PoolRequest, |
113 | | public Extensions::Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryCallbacks, |
114 | | public Logger::Loggable<Logger::Id::redis> { |
115 | | PendingRequest(ThreadLocalPool& parent, RespVariant&& incoming_request, |
116 | | PoolCallbacks& pool_callbacks, Upstream::HostConstSharedPtr& host); |
117 | | ~PendingRequest() override; |
118 | | |
119 | | // Common::Redis::Client::ClientCallbacks |
120 | | void onResponse(Common::Redis::RespValuePtr&& response) override; |
121 | | void onFailure() override; |
122 | | void onRedirection(Common::Redis::RespValuePtr&& value, const std::string& host_address, |
123 | | bool ask_redirection) override; |
124 | | |
125 | | // PoolRequest |
126 | | void cancel() override; |
127 | | |
128 | | // Extensions::Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryCallbacks |
129 | | void onLoadDnsCacheComplete( |
130 | | const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr&) override; |
131 | | |
132 | | std::string formatAddress(const Envoy::Network::Address::Ip& ip); |
133 | | void doRedirection(Common::Redis::RespValuePtr&& value, const std::string& host_address, |
134 | | bool ask_redirection); |
135 | | |
136 | | ThreadLocalPool& parent_; |
137 | | const RespVariant incoming_request_; |
138 | | Common::Redis::Client::PoolRequest* request_handler_; |
139 | | PoolCallbacks& pool_callbacks_; |
140 | | Upstream::HostConstSharedPtr host_; |
141 | | Common::Redis::RespValuePtr resp_value_; |
142 | | bool ask_redirection_; |
143 | | Extensions::Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryHandlePtr |
144 | | cache_load_handle_; |
145 | | }; |
146 | | |
147 | | struct ThreadLocalPool : public ThreadLocal::ThreadLocalObject, |
148 | | public Upstream::ClusterUpdateCallbacks, |
149 | | public Logger::Loggable<Logger::Id::redis> { |
150 | | ThreadLocalPool(std::shared_ptr<InstanceImpl> parent, Event::Dispatcher& dispatcher, |
151 | | std::string cluster_name, |
152 | | const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr& dns_cache); |
153 | | ~ThreadLocalPool() override; |
154 | | ThreadLocalActiveClientPtr& threadLocalActiveClient(Upstream::HostConstSharedPtr host); |
155 | | Common::Redis::Client::PoolRequest* |
156 | | makeRequest(const std::string& key, RespVariant&& request, PoolCallbacks& callbacks, |
157 | | Common::Redis::Client::Transaction& transaction); |
158 | | Common::Redis::Client::PoolRequest* |
159 | | makeRequestToHost(const std::string& host_address, const Common::Redis::RespValue& request, |
160 | | Common::Redis::Client::ClientCallbacks& callbacks); |
161 | | |
162 | | void onClusterAddOrUpdateNonVirtual(absl::string_view cluster_name, |
163 | | Upstream::ThreadLocalClusterCommand& get_cluster); |
164 | | void onHostsAdded(const std::vector<Upstream::HostSharedPtr>& hosts_added); |
165 | | void onHostsRemoved(const std::vector<Upstream::HostSharedPtr>& hosts_removed); |
166 | | void drainClients(); |
167 | | |
168 | | // Upstream::ClusterUpdateCallbacks |
169 | | void onClusterAddOrUpdate(absl::string_view cluster_name, |
170 | 0 | Upstream::ThreadLocalClusterCommand& get_cluster) override { |
171 | 0 | onClusterAddOrUpdateNonVirtual(cluster_name, get_cluster); |
172 | 0 | } |
173 | | void onClusterRemoval(const std::string& cluster_name) override; |
174 | | |
175 | | void onRequestCompleted(); |
176 | | |
177 | | std::weak_ptr<InstanceImpl> parent_; |
178 | | Event::Dispatcher& dispatcher_; |
179 | | const std::string cluster_name_; |
180 | | const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr dns_cache_{nullptr}; |
181 | | Upstream::ClusterUpdateCallbacksHandlePtr cluster_update_handle_; |
182 | | Upstream::ThreadLocalCluster* cluster_{}; |
183 | | absl::node_hash_map<Upstream::HostConstSharedPtr, ThreadLocalActiveClientPtr> client_map_; |
184 | | absl::node_hash_map<Upstream::HostConstSharedPtr, TokenBucketPtr> cx_rate_limiter_map_; |
185 | | Envoy::Common::CallbackHandlePtr host_set_member_update_cb_handle_; |
186 | | absl::node_hash_map<std::string, Upstream::HostConstSharedPtr> host_address_map_; |
187 | | std::string auth_username_; |
188 | | std::string auth_password_; |
189 | | std::list<Upstream::HostSharedPtr> created_via_redirect_hosts_; |
190 | | std::list<ThreadLocalActiveClientPtr> clients_to_drain_; |
191 | | std::list<PendingRequest> pending_requests_; |
192 | | |
193 | | /* This timer is used to poll the active clients in clients_to_drain_ to determine whether they |
194 | | * have been drained (have no active requests) or not. It is only enabled after a client has |
195 | | * been added to clients_to_drain_, and is only re-enabled as long as that list is not empty. A |
196 | | * timer is being used as opposed to using a callback to avoid adding a check of |
197 | | * clients_to_drain_ to the main data code path as this should only rarely be not empty. |
198 | | */ |
199 | | Event::TimerPtr drain_timer_; |
200 | | bool is_redis_cluster_{false}; |
201 | | Common::Redis::Client::ClientFactory& client_factory_; |
202 | | Common::Redis::Client::ConfigSharedPtr config_; |
203 | | Stats::ScopeSharedPtr stats_scope_; |
204 | | Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_; |
205 | | RedisClusterStats redis_cluster_stats_; |
206 | | const Extensions::Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager_; |
207 | | }; |
208 | | |
209 | | const std::string cluster_name_; |
210 | | Upstream::ClusterManager& cm_; |
211 | | Common::Redis::Client::ClientFactory& client_factory_; |
212 | | ThreadLocal::SlotPtr tls_; |
213 | | Common::Redis::Client::ConfigSharedPtr config_; |
214 | | Api::Api& api_; |
215 | | Stats::ScopeSharedPtr stats_scope_; |
216 | | Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_; |
217 | | RedisClusterStats redis_cluster_stats_; |
218 | | const Extensions::Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager_; |
219 | | const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr dns_cache_{nullptr}; |
220 | | }; |
221 | | |
222 | | } // namespace ConnPool |
223 | | } // namespace RedisProxy |
224 | | } // namespace NetworkFilters |
225 | | } // namespace Extensions |
226 | | } // namespace Envoy |