Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/redis_proxy/proxy_filter.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <cstdint>
4
#include <list>
5
#include <memory>
6
#include <string>
7
8
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h"
9
#include "envoy/network/drain_decision.h"
10
#include "envoy/network/filter.h"
11
#include "envoy/stats/scope.h"
12
#include "envoy/upstream/cluster_manager.h"
13
14
#include "source/common/buffer/buffer_impl.h"
15
#include "source/extensions/common/dynamic_forward_proxy/dns_cache.h"
16
#include "source/extensions/filters/network/common/redis/codec.h"
17
#include "source/extensions/filters/network/redis_proxy/command_splitter.h"
18
19
namespace Envoy {
20
namespace Extensions {
21
namespace NetworkFilters {
22
namespace RedisProxy {
23
24
/**
25
 * All redis proxy stats. @see stats_macros.h
26
 */
27
#define ALL_REDIS_PROXY_STATS(COUNTER, GAUGE)                                                      \
28
0
  COUNTER(downstream_cx_drain_close)                                                               \
29
0
  COUNTER(downstream_cx_protocol_error)                                                            \
30
0
  COUNTER(downstream_cx_rx_bytes_total)                                                            \
31
0
  COUNTER(downstream_cx_total)                                                                     \
32
0
  COUNTER(downstream_cx_tx_bytes_total)                                                            \
33
0
  COUNTER(downstream_rq_total)                                                                     \
34
0
  GAUGE(downstream_cx_active, Accumulate)                                                          \
35
0
  GAUGE(downstream_cx_rx_bytes_buffered, Accumulate)                                               \
36
0
  GAUGE(downstream_cx_tx_bytes_buffered, Accumulate)                                               \
37
0
  GAUGE(downstream_rq_active, Accumulate)
38
39
/**
40
 * Struct definition for all redis proxy stats. @see stats_macros.h
41
 */
42
struct ProxyStats {
43
  ALL_REDIS_PROXY_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
44
};
45
46
/**
47
 * Configuration for the redis proxy filter.
48
 */
49
class ProxyFilterConfig : public Logger::Loggable<Logger::Id::redis> {
50
public:
51
  ProxyFilterConfig(
52
      const envoy::extensions::filters::network::redis_proxy::v3::RedisProxy& config,
53
      Stats::Scope& scope, const Network::DrainDecision& drain_decision, Runtime::Loader& runtime,
54
      Api::Api& api,
55
      Extensions::Common::DynamicForwardProxy::DnsCacheManagerFactory& cache_manager_factory);
56
57
  const Network::DrainDecision& drain_decision_;
58
  Runtime::Loader& runtime_;
59
  const std::string stat_prefix_;
60
  const std::string redis_drain_close_runtime_key_{"redis.drain_close_enabled"};
61
  ProxyStats stats_;
62
  const std::string downstream_auth_username_;
63
  std::vector<std::string> downstream_auth_passwords_;
64
65
  // DNS cache used for ASK/MOVED responses.
66
  const Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr dns_cache_manager_;
67
  const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr dns_cache_{nullptr};
68
69
private:
70
  static ProxyStats generateStats(const std::string& prefix, Stats::Scope& scope);
71
  Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr
72
  getCache(const envoy::extensions::filters::network::redis_proxy::v3::RedisProxy& config);
73
};
74
75
using ProxyFilterConfigSharedPtr = std::shared_ptr<ProxyFilterConfig>;
76
77
/**
78
 * A redis multiplexing proxy filter. This filter will take incoming redis pipelined commands, and
79
 * multiplex them onto a consistently hashed connection pool of backend servers.
80
 */
81
class ProxyFilter : public Network::ReadFilter,
82
                    public Common::Redis::DecoderCallbacks,
83
                    public Network::ConnectionCallbacks,
84
                    public Logger::Loggable<Logger::Id::redis> {
85
public:
86
  ProxyFilter(Common::Redis::DecoderFactory& factory, Common::Redis::EncoderPtr&& encoder,
87
              CommandSplitter::Instance& splitter, ProxyFilterConfigSharedPtr config);
88
  ~ProxyFilter() override;
89
90
  // Network::ReadFilter
91
  void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;
92
  Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;
93
0
  Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; }
94
95
  // Network::ConnectionCallbacks
96
  void onEvent(Network::ConnectionEvent event) override;
97
0
  void onAboveWriteBufferHighWatermark() override {}
98
0
  void onBelowWriteBufferLowWatermark() override {}
99
100
  // Common::Redis::DecoderCallbacks
101
  void onRespValue(Common::Redis::RespValuePtr&& value) override;
102
103
0
  bool connectionAllowed() { return connection_allowed_; }
104
105
0
  Common::Redis::Client::Transaction& transaction() { return transaction_; }
106
107
private:
108
  friend class RedisProxyFilterTest;
109
110
  struct PendingRequest : public CommandSplitter::SplitCallbacks {
111
    PendingRequest(ProxyFilter& parent);
112
    ~PendingRequest() override;
113
114
    // RedisProxy::CommandSplitter::SplitCallbacks
115
0
    bool connectionAllowed() override { return parent_.connectionAllowed(); }
116
0
    void onQuit() override { parent_.onQuit(*this); }
117
0
    void onAuth(const std::string& password) override { parent_.onAuth(*this, password); }
118
0
    void onAuth(const std::string& username, const std::string& password) override {
119
0
      parent_.onAuth(*this, username, password);
120
0
    }
121
0
    void onResponse(Common::Redis::RespValuePtr&& value) override {
122
0
      parent_.onResponse(*this, std::move(value));
123
0
    }
124
125
0
    Common::Redis::Client::Transaction& transaction() override { return parent_.transaction(); }
126
127
    ProxyFilter& parent_;
128
    Common::Redis::RespValuePtr pending_response_;
129
    CommandSplitter::SplitRequestPtr request_handle_;
130
  };
131
132
  void onQuit(PendingRequest& request);
133
  void onAuth(PendingRequest& request, const std::string& password);
134
  void onAuth(PendingRequest& request, const std::string& username, const std::string& password);
135
  void onResponse(PendingRequest& request, Common::Redis::RespValuePtr&& value);
136
  bool checkPassword(const std::string& password);
137
138
  Common::Redis::DecoderPtr decoder_;
139
  Common::Redis::EncoderPtr encoder_;
140
  CommandSplitter::Instance& splitter_;
141
  ProxyFilterConfigSharedPtr config_;
142
  Buffer::OwnedImpl encoder_buffer_;
143
  Network::ReadFilterCallbacks* callbacks_{};
144
  std::list<PendingRequest> pending_requests_;
145
  bool connection_allowed_;
146
  Common::Redis::Client::Transaction transaction_;
147
  bool connection_quit_;
148
};
149
150
} // namespace RedisProxy
151
} // namespace NetworkFilters
152
} // namespace Extensions
153
} // namespace Envoy