Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/redis_proxy/conn_pool_impl.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <chrono>
4
#include <cstdint>
5
#include <list>
6
#include <memory>
7
#include <string>
8
#include <vector>
9
10
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h"
11
#include "envoy/stats/stats_macros.h"
12
#include "envoy/thread_local/thread_local.h"
13
#include "envoy/upstream/cluster_manager.h"
14
15
#include "source/common/buffer/buffer_impl.h"
16
#include "source/common/common/token_bucket_impl.h"
17
#include "source/common/network/address_impl.h"
18
#include "source/common/network/filter_impl.h"
19
#include "source/common/protobuf/utility.h"
20
#include "source/common/singleton/const_singleton.h"
21
#include "source/common/upstream/load_balancer_impl.h"
22
#include "source/common/upstream/upstream_impl.h"
23
#include "source/extensions/clusters/redis/redis_cluster_lb.h"
24
#include "source/extensions/common/dynamic_forward_proxy/dns_cache.h"
25
#include "source/extensions/common/redis/cluster_refresh_manager.h"
26
#include "source/extensions/filters/network/common/redis/client.h"
27
#include "source/extensions/filters/network/common/redis/client_impl.h"
28
#include "source/extensions/filters/network/common/redis/codec_impl.h"
29
#include "source/extensions/filters/network/common/redis/utility.h"
30
#include "source/extensions/filters/network/redis_proxy/conn_pool.h"
31
32
#include "absl/container/node_hash_map.h"
33
34
namespace Envoy {
35
namespace Extensions {
36
namespace NetworkFilters {
37
namespace RedisProxy {
38
namespace ConnPool {
39
40
// TODO(mattklein123): Circuit breaking
41
// TODO(rshriram): Fault injection
42
43
#define REDIS_CLUSTER_STATS(COUNTER)                                                               \
44
  COUNTER(upstream_cx_drained)                                                                     \
45
  COUNTER(max_upstream_unknown_connections_reached)                                                \
46
  COUNTER(connection_rate_limited)
47
48
struct RedisClusterStats {
49
  REDIS_CLUSTER_STATS(GENERATE_COUNTER_STRUCT)
50
};
51
52
class DoNothingPoolCallbacks : public PoolCallbacks {
53
public:
54
0
  void onResponse(Common::Redis::RespValuePtr&&) override{};
55
0
  void onFailure() override{};
56
};
57
58
class InstanceImpl : public Instance, public std::enable_shared_from_this<InstanceImpl> {
59
public:
60
  InstanceImpl(
61
      const std::string& cluster_name, Upstream::ClusterManager& cm,
62
      Common::Redis::Client::ClientFactory& client_factory, ThreadLocal::SlotAllocator& tls,
63
      const envoy::extensions::filters::network::redis_proxy::v3::RedisProxy::ConnPoolSettings&
64
          config,
65
      Api::Api& api, Stats::ScopeSharedPtr&& stats_scope,
66
      const Common::Redis::RedisCommandStatsSharedPtr& redis_command_stats,
67
      Extensions::Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager,
68
      const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr& dns_cache);
69
  // RedisProxy::ConnPool::Instance
70
  Common::Redis::Client::PoolRequest*
71
  makeRequest(const std::string& key, RespVariant&& request, PoolCallbacks& callbacks,
72
              Common::Redis::Client::Transaction& transaction) override;
73
  /**
74
   * Makes a redis request based on IP address and TCP port of the upstream host (e.g.,
75
   * moved/ask cluster redirection). This is now only kept mostly for testing.
76
   * @param host_address supplies the IP address and TCP port of the upstream host to receive
77
   * the request.
78
   * @param request supplies the Redis request to make.
79
   * @param callbacks supplies the request completion callbacks.
80
   * @return PoolRequest* a handle to the active request or nullptr if the request could not be
81
   * made for some reason.
82
   */
83
  Common::Redis::Client::PoolRequest*
84
  makeRequestToHost(const std::string& host_address, const Common::Redis::RespValue& request,
85
                    Common::Redis::Client::ClientCallbacks& callbacks);
86
87
  void init();
88
89
  // Allow the unit test to have access to private members.
90
  friend class RedisConnPoolImplTest;
91
92
private:
93
  struct ThreadLocalPool;
94
95
  struct ThreadLocalActiveClient : public Network::ConnectionCallbacks {
96
0
    ThreadLocalActiveClient(ThreadLocalPool& parent) : parent_(parent) {}
97
98
    // Network::ConnectionCallbacks
99
    void onEvent(Network::ConnectionEvent event) override;
100
0
    void onAboveWriteBufferHighWatermark() override {}
101
0
    void onBelowWriteBufferLowWatermark() override {}
102
103
    ThreadLocalPool& parent_;
104
    Upstream::HostConstSharedPtr host_;
105
    Common::Redis::Client::ClientPtr redis_client_;
106
  };
107
108
  using ThreadLocalActiveClientPtr = std::unique_ptr<ThreadLocalActiveClient>;
109
110
  struct PendingRequest
111
      : public Common::Redis::Client::ClientCallbacks,
112
        public Common::Redis::Client::PoolRequest,
113
        public Extensions::Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryCallbacks,
114
        public Logger::Loggable<Logger::Id::redis> {
115
    PendingRequest(ThreadLocalPool& parent, RespVariant&& incoming_request,
116
                   PoolCallbacks& pool_callbacks, Upstream::HostConstSharedPtr& host);
117
    ~PendingRequest() override;
118
119
    // Common::Redis::Client::ClientCallbacks
120
    void onResponse(Common::Redis::RespValuePtr&& response) override;
121
    void onFailure() override;
122
    void onRedirection(Common::Redis::RespValuePtr&& value, const std::string& host_address,
123
                       bool ask_redirection) override;
124
125
    // PoolRequest
126
    void cancel() override;
127
128
    // Extensions::Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryCallbacks
129
    void onLoadDnsCacheComplete(
130
        const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr&) override;
131
132
    std::string formatAddress(const Envoy::Network::Address::Ip& ip);
133
    void doRedirection(Common::Redis::RespValuePtr&& value, const std::string& host_address,
134
                       bool ask_redirection);
135
136
    ThreadLocalPool& parent_;
137
    const RespVariant incoming_request_;
138
    Common::Redis::Client::PoolRequest* request_handler_;
139
    PoolCallbacks& pool_callbacks_;
140
    Upstream::HostConstSharedPtr host_;
141
    Common::Redis::RespValuePtr resp_value_;
142
    bool ask_redirection_;
143
    Extensions::Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryHandlePtr
144
        cache_load_handle_;
145
  };
146
147
  struct ThreadLocalPool : public ThreadLocal::ThreadLocalObject,
148
                           public Upstream::ClusterUpdateCallbacks,
149
                           public Logger::Loggable<Logger::Id::redis> {
150
    ThreadLocalPool(std::shared_ptr<InstanceImpl> parent, Event::Dispatcher& dispatcher,
151
                    std::string cluster_name,
152
                    const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr& dns_cache);
153
    ~ThreadLocalPool() override;
154
    ThreadLocalActiveClientPtr& threadLocalActiveClient(Upstream::HostConstSharedPtr host);
155
    Common::Redis::Client::PoolRequest*
156
    makeRequest(const std::string& key, RespVariant&& request, PoolCallbacks& callbacks,
157
                Common::Redis::Client::Transaction& transaction);
158
    Common::Redis::Client::PoolRequest*
159
    makeRequestToHost(const std::string& host_address, const Common::Redis::RespValue& request,
160
                      Common::Redis::Client::ClientCallbacks& callbacks);
161
162
    void onClusterAddOrUpdateNonVirtual(absl::string_view cluster_name,
163
                                        Upstream::ThreadLocalClusterCommand& get_cluster);
164
    void onHostsAdded(const std::vector<Upstream::HostSharedPtr>& hosts_added);
165
    void onHostsRemoved(const std::vector<Upstream::HostSharedPtr>& hosts_removed);
166
    void drainClients();
167
168
    // Upstream::ClusterUpdateCallbacks
169
    void onClusterAddOrUpdate(absl::string_view cluster_name,
170
0
                              Upstream::ThreadLocalClusterCommand& get_cluster) override {
171
0
      onClusterAddOrUpdateNonVirtual(cluster_name, get_cluster);
172
0
    }
173
    void onClusterRemoval(const std::string& cluster_name) override;
174
175
    void onRequestCompleted();
176
177
    std::weak_ptr<InstanceImpl> parent_;
178
    Event::Dispatcher& dispatcher_;
179
    const std::string cluster_name_;
180
    const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr dns_cache_{nullptr};
181
    Upstream::ClusterUpdateCallbacksHandlePtr cluster_update_handle_;
182
    Upstream::ThreadLocalCluster* cluster_{};
183
    absl::node_hash_map<Upstream::HostConstSharedPtr, ThreadLocalActiveClientPtr> client_map_;
184
    absl::node_hash_map<Upstream::HostConstSharedPtr, TokenBucketPtr> cx_rate_limiter_map_;
185
    Envoy::Common::CallbackHandlePtr host_set_member_update_cb_handle_;
186
    absl::node_hash_map<std::string, Upstream::HostConstSharedPtr> host_address_map_;
187
    std::string auth_username_;
188
    std::string auth_password_;
189
    std::list<Upstream::HostSharedPtr> created_via_redirect_hosts_;
190
    std::list<ThreadLocalActiveClientPtr> clients_to_drain_;
191
    std::list<PendingRequest> pending_requests_;
192
193
    /* This timer is used to poll the active clients in clients_to_drain_ to determine whether they
194
     * have been drained (have no active requests) or not. It is only enabled after a client has
195
     * been added to clients_to_drain_, and is only re-enabled as long as that list is not empty. A
196
     * timer is being used as opposed to using a callback to avoid adding a check of
197
     * clients_to_drain_ to the main data code path as this should only rarely be not empty.
198
     */
199
    Event::TimerPtr drain_timer_;
200
    bool is_redis_cluster_{false};
201
    Common::Redis::Client::ClientFactory& client_factory_;
202
    Common::Redis::Client::ConfigSharedPtr config_;
203
    Stats::ScopeSharedPtr stats_scope_;
204
    Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_;
205
    RedisClusterStats redis_cluster_stats_;
206
    const Extensions::Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager_;
207
  };
208
209
  const std::string cluster_name_;
210
  Upstream::ClusterManager& cm_;
211
  Common::Redis::Client::ClientFactory& client_factory_;
212
  ThreadLocal::SlotPtr tls_;
213
  Common::Redis::Client::ConfigSharedPtr config_;
214
  Api::Api& api_;
215
  Stats::ScopeSharedPtr stats_scope_;
216
  Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_;
217
  RedisClusterStats redis_cluster_stats_;
218
  const Extensions::Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager_;
219
  const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr dns_cache_{nullptr};
220
};
221
222
} // namespace ConnPool
223
} // namespace RedisProxy
224
} // namespace NetworkFilters
225
} // namespace Extensions
226
} // namespace Envoy