/proc/self/cwd/source/extensions/filters/network/redis_proxy/proxy_filter.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <cstdint> |
4 | | #include <list> |
5 | | #include <memory> |
6 | | #include <string> |
7 | | |
8 | | #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h" |
9 | | #include "envoy/network/drain_decision.h" |
10 | | #include "envoy/network/filter.h" |
11 | | #include "envoy/stats/scope.h" |
12 | | #include "envoy/upstream/cluster_manager.h" |
13 | | |
14 | | #include "source/common/buffer/buffer_impl.h" |
15 | | #include "source/extensions/common/dynamic_forward_proxy/dns_cache.h" |
16 | | #include "source/extensions/filters/network/common/redis/codec.h" |
17 | | #include "source/extensions/filters/network/redis_proxy/command_splitter.h" |
18 | | |
19 | | namespace Envoy { |
20 | | namespace Extensions { |
21 | | namespace NetworkFilters { |
22 | | namespace RedisProxy { |
23 | | |
24 | | /** |
25 | | * All redis proxy stats. @see stats_macros.h |
26 | | */ |
27 | | #define ALL_REDIS_PROXY_STATS(COUNTER, GAUGE) \ |
28 | 0 | COUNTER(downstream_cx_drain_close) \ |
29 | 0 | COUNTER(downstream_cx_protocol_error) \ |
30 | 0 | COUNTER(downstream_cx_rx_bytes_total) \ |
31 | 0 | COUNTER(downstream_cx_total) \ |
32 | 0 | COUNTER(downstream_cx_tx_bytes_total) \ |
33 | 0 | COUNTER(downstream_rq_total) \ |
34 | 0 | GAUGE(downstream_cx_active, Accumulate) \ |
35 | 0 | GAUGE(downstream_cx_rx_bytes_buffered, Accumulate) \ |
36 | 0 | GAUGE(downstream_cx_tx_bytes_buffered, Accumulate) \ |
37 | 0 | GAUGE(downstream_rq_active, Accumulate) |
38 | | |
39 | | /** |
40 | | * Struct definition for all redis proxy stats. @see stats_macros.h |
41 | | */ |
42 | | struct ProxyStats { |
43 | | ALL_REDIS_PROXY_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) |
44 | | }; |
45 | | |
46 | | /** |
47 | | * Configuration for the redis proxy filter. |
48 | | */ |
49 | | class ProxyFilterConfig : public Logger::Loggable<Logger::Id::redis> { |
50 | | public: |
51 | | ProxyFilterConfig( |
52 | | const envoy::extensions::filters::network::redis_proxy::v3::RedisProxy& config, |
53 | | Stats::Scope& scope, const Network::DrainDecision& drain_decision, Runtime::Loader& runtime, |
54 | | Api::Api& api, |
55 | | Extensions::Common::DynamicForwardProxy::DnsCacheManagerFactory& cache_manager_factory); |
56 | | |
57 | | const Network::DrainDecision& drain_decision_; |
58 | | Runtime::Loader& runtime_; |
59 | | const std::string stat_prefix_; |
60 | | const std::string redis_drain_close_runtime_key_{"redis.drain_close_enabled"}; |
61 | | ProxyStats stats_; |
62 | | const std::string downstream_auth_username_; |
63 | | std::vector<std::string> downstream_auth_passwords_; |
64 | | |
65 | | // DNS cache used for ASK/MOVED responses. |
66 | | const Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr dns_cache_manager_; |
67 | | const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr dns_cache_{nullptr}; |
68 | | |
69 | | private: |
70 | | static ProxyStats generateStats(const std::string& prefix, Stats::Scope& scope); |
71 | | Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr |
72 | | getCache(const envoy::extensions::filters::network::redis_proxy::v3::RedisProxy& config); |
73 | | }; |
74 | | |
75 | | using ProxyFilterConfigSharedPtr = std::shared_ptr<ProxyFilterConfig>; |
76 | | |
77 | | /** |
78 | | * A redis multiplexing proxy filter. This filter will take incoming redis pipelined commands, and |
79 | | * multiplex them onto a consistently hashed connection pool of backend servers. |
80 | | */ |
81 | | class ProxyFilter : public Network::ReadFilter, |
82 | | public Common::Redis::DecoderCallbacks, |
83 | | public Network::ConnectionCallbacks, |
84 | | public Logger::Loggable<Logger::Id::redis> { |
85 | | public: |
86 | | ProxyFilter(Common::Redis::DecoderFactory& factory, Common::Redis::EncoderPtr&& encoder, |
87 | | CommandSplitter::Instance& splitter, ProxyFilterConfigSharedPtr config); |
88 | | ~ProxyFilter() override; |
89 | | |
90 | | // Network::ReadFilter |
91 | | void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; |
92 | | Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override; |
93 | 0 | Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; } |
94 | | |
95 | | // Network::ConnectionCallbacks |
96 | | void onEvent(Network::ConnectionEvent event) override; |
97 | 0 | void onAboveWriteBufferHighWatermark() override {} |
98 | 0 | void onBelowWriteBufferLowWatermark() override {} |
99 | | |
100 | | // Common::Redis::DecoderCallbacks |
101 | | void onRespValue(Common::Redis::RespValuePtr&& value) override; |
102 | | |
103 | 0 | bool connectionAllowed() { return connection_allowed_; } |
104 | | |
105 | 0 | Common::Redis::Client::Transaction& transaction() { return transaction_; } |
106 | | |
107 | | private: |
108 | | friend class RedisProxyFilterTest; |
109 | | |
110 | | struct PendingRequest : public CommandSplitter::SplitCallbacks { |
111 | | PendingRequest(ProxyFilter& parent); |
112 | | ~PendingRequest() override; |
113 | | |
114 | | // RedisProxy::CommandSplitter::SplitCallbacks |
115 | 0 | bool connectionAllowed() override { return parent_.connectionAllowed(); } |
116 | 0 | void onQuit() override { parent_.onQuit(*this); } |
117 | 0 | void onAuth(const std::string& password) override { parent_.onAuth(*this, password); } |
118 | 0 | void onAuth(const std::string& username, const std::string& password) override { |
119 | 0 | parent_.onAuth(*this, username, password); |
120 | 0 | } |
121 | 0 | void onResponse(Common::Redis::RespValuePtr&& value) override { |
122 | 0 | parent_.onResponse(*this, std::move(value)); |
123 | 0 | } |
124 | | |
125 | 0 | Common::Redis::Client::Transaction& transaction() override { return parent_.transaction(); } |
126 | | |
127 | | ProxyFilter& parent_; |
128 | | Common::Redis::RespValuePtr pending_response_; |
129 | | CommandSplitter::SplitRequestPtr request_handle_; |
130 | | }; |
131 | | |
132 | | void onQuit(PendingRequest& request); |
133 | | void onAuth(PendingRequest& request, const std::string& password); |
134 | | void onAuth(PendingRequest& request, const std::string& username, const std::string& password); |
135 | | void onResponse(PendingRequest& request, Common::Redis::RespValuePtr&& value); |
136 | | bool checkPassword(const std::string& password); |
137 | | |
138 | | Common::Redis::DecoderPtr decoder_; |
139 | | Common::Redis::EncoderPtr encoder_; |
140 | | CommandSplitter::Instance& splitter_; |
141 | | ProxyFilterConfigSharedPtr config_; |
142 | | Buffer::OwnedImpl encoder_buffer_; |
143 | | Network::ReadFilterCallbacks* callbacks_{}; |
144 | | std::list<PendingRequest> pending_requests_; |
145 | | bool connection_allowed_; |
146 | | Common::Redis::Client::Transaction transaction_; |
147 | | bool connection_quit_; |
148 | | }; |
149 | | |
150 | | } // namespace RedisProxy |
151 | | } // namespace NetworkFilters |
152 | | } // namespace Extensions |
153 | | } // namespace Envoy |