/proc/self/cwd/source/extensions/filters/network/zookeeper_proxy/filter.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <memory> |
4 | | #include <string> |
5 | | #include <vector> |
6 | | |
7 | | #include "envoy/access_log/access_log.h" |
8 | | #include "envoy/extensions/filters/network/zookeeper_proxy/v3/zookeeper_proxy.pb.h" |
9 | | #include "envoy/extensions/filters/network/zookeeper_proxy/v3/zookeeper_proxy.pb.validate.h" |
10 | | #include "envoy/network/connection.h" |
11 | | #include "envoy/network/filter.h" |
12 | | #include "envoy/stats/scope.h" |
13 | | #include "envoy/stats/stats.h" |
14 | | #include "envoy/stats/stats_macros.h" |
15 | | |
16 | | #include "source/common/common/logger.h" |
17 | | #include "source/common/stats/symbol_table.h" |
18 | | #include "source/extensions/filters/network/zookeeper_proxy/decoder.h" |
19 | | |
20 | | namespace Envoy { |
21 | | namespace Extensions { |
22 | | namespace NetworkFilters { |
23 | | namespace ZooKeeperProxy { |
24 | | |
25 | | /** |
26 | | * All ZooKeeper proxy stats. @see stats_macros.h |
27 | | */ |
28 | | #define ALL_ZOOKEEPER_PROXY_STATS(COUNTER) \ |
29 | 625 | COUNTER(decoder_error) \ |
30 | 625 | COUNTER(request_bytes) \ |
31 | 625 | COUNTER(connect_rq_bytes) \ |
32 | 625 | COUNTER(connect_readonly_rq_bytes) \ |
33 | 625 | COUNTER(ping_rq_bytes) \ |
34 | 625 | COUNTER(auth_rq_bytes) \ |
35 | 625 | COUNTER(getdata_rq_bytes) \ |
36 | 625 | COUNTER(create_rq_bytes) \ |
37 | 625 | COUNTER(create2_rq_bytes) \ |
38 | 625 | COUNTER(createcontainer_rq_bytes) \ |
39 | 625 | COUNTER(createttl_rq_bytes) \ |
40 | 625 | COUNTER(setdata_rq_bytes) \ |
41 | 625 | COUNTER(getchildren_rq_bytes) \ |
42 | 625 | COUNTER(getchildren2_rq_bytes) \ |
43 | 625 | COUNTER(getephemerals_rq_bytes) \ |
44 | 625 | COUNTER(getallchildrennumber_rq_bytes) \ |
45 | 625 | COUNTER(delete_rq_bytes) \ |
46 | 625 | COUNTER(exists_rq_bytes) \ |
47 | 625 | COUNTER(getacl_rq_bytes) \ |
48 | 625 | COUNTER(setacl_rq_bytes) \ |
49 | 625 | COUNTER(sync_rq_bytes) \ |
50 | 625 | COUNTER(multi_rq_bytes) \ |
51 | 625 | COUNTER(reconfig_rq_bytes) \ |
52 | 625 | COUNTER(close_rq_bytes) \ |
53 | 625 | COUNTER(setauth_rq_bytes) \ |
54 | 625 | COUNTER(setwatches_rq_bytes) \ |
55 | 625 | COUNTER(setwatches2_rq_bytes) \ |
56 | 625 | COUNTER(addwatch_rq_bytes) \ |
57 | 625 | COUNTER(checkwatches_rq_bytes) \ |
58 | 625 | COUNTER(removewatches_rq_bytes) \ |
59 | 625 | COUNTER(check_rq_bytes) \ |
60 | 625 | COUNTER(connect_rq) \ |
61 | 625 | COUNTER(connect_readonly_rq) \ |
62 | 625 | COUNTER(getdata_rq) \ |
63 | 625 | COUNTER(create_rq) \ |
64 | 625 | COUNTER(create2_rq) \ |
65 | 625 | COUNTER(createcontainer_rq) \ |
66 | 625 | COUNTER(createttl_rq) \ |
67 | 625 | COUNTER(setdata_rq) \ |
68 | 625 | COUNTER(getchildren_rq) \ |
69 | 625 | COUNTER(getchildren2_rq) \ |
70 | 625 | COUNTER(getephemerals_rq) \ |
71 | 625 | COUNTER(getallchildrennumber_rq) \ |
72 | 625 | COUNTER(delete_rq) \ |
73 | 625 | COUNTER(exists_rq) \ |
74 | 625 | COUNTER(getacl_rq) \ |
75 | 625 | COUNTER(setacl_rq) \ |
76 | 625 | COUNTER(sync_rq) \ |
77 | 625 | COUNTER(ping_rq) \ |
78 | 625 | COUNTER(multi_rq) \ |
79 | 625 | COUNTER(reconfig_rq) \ |
80 | 625 | COUNTER(close_rq) \ |
81 | 625 | COUNTER(setauth_rq) \ |
82 | 625 | COUNTER(setwatches_rq) \ |
83 | 625 | COUNTER(setwatches2_rq) \ |
84 | 625 | COUNTER(addwatch_rq) \ |
85 | 625 | COUNTER(checkwatches_rq) \ |
86 | 625 | COUNTER(removewatches_rq) \ |
87 | 625 | COUNTER(check_rq) \ |
88 | 625 | COUNTER(response_bytes) \ |
89 | 625 | COUNTER(connect_resp_bytes) \ |
90 | 625 | COUNTER(ping_resp_bytes) \ |
91 | 625 | COUNTER(auth_resp_bytes) \ |
92 | 625 | COUNTER(getdata_resp_bytes) \ |
93 | 625 | COUNTER(create_resp_bytes) \ |
94 | 625 | COUNTER(create2_resp_bytes) \ |
95 | 625 | COUNTER(createcontainer_resp_bytes) \ |
96 | 625 | COUNTER(createttl_resp_bytes) \ |
97 | 625 | COUNTER(setdata_resp_bytes) \ |
98 | 625 | COUNTER(getchildren_resp_bytes) \ |
99 | 625 | COUNTER(getchildren2_resp_bytes) \ |
100 | 625 | COUNTER(getephemerals_resp_bytes) \ |
101 | 625 | COUNTER(getallchildrennumber_resp_bytes) \ |
102 | 625 | COUNTER(delete_resp_bytes) \ |
103 | 625 | COUNTER(exists_resp_bytes) \ |
104 | 625 | COUNTER(getacl_resp_bytes) \ |
105 | 625 | COUNTER(setacl_resp_bytes) \ |
106 | 625 | COUNTER(sync_resp_bytes) \ |
107 | 625 | COUNTER(multi_resp_bytes) \ |
108 | 625 | COUNTER(reconfig_resp_bytes) \ |
109 | 625 | COUNTER(close_resp_bytes) \ |
110 | 625 | COUNTER(setauth_resp_bytes) \ |
111 | 625 | COUNTER(setwatches_resp_bytes) \ |
112 | 625 | COUNTER(setwatches2_resp_bytes) \ |
113 | 625 | COUNTER(addwatch_resp_bytes) \ |
114 | 625 | COUNTER(checkwatches_resp_bytes) \ |
115 | 625 | COUNTER(removewatches_resp_bytes) \ |
116 | 625 | COUNTER(check_resp_bytes) \ |
117 | 625 | COUNTER(connect_resp) \ |
118 | 625 | COUNTER(ping_resp) \ |
119 | 625 | COUNTER(auth_resp) \ |
120 | 625 | COUNTER(getdata_resp) \ |
121 | 625 | COUNTER(create_resp) \ |
122 | 625 | COUNTER(create2_resp) \ |
123 | 625 | COUNTER(createcontainer_resp) \ |
124 | 625 | COUNTER(createttl_resp) \ |
125 | 625 | COUNTER(setdata_resp) \ |
126 | 625 | COUNTER(getchildren_resp) \ |
127 | 625 | COUNTER(getchildren2_resp) \ |
128 | 625 | COUNTER(getephemerals_resp) \ |
129 | 625 | COUNTER(getallchildrennumber_resp) \ |
130 | 625 | COUNTER(delete_resp) \ |
131 | 625 | COUNTER(exists_resp) \ |
132 | 625 | COUNTER(getacl_resp) \ |
133 | 625 | COUNTER(setacl_resp) \ |
134 | 625 | COUNTER(sync_resp) \ |
135 | 625 | COUNTER(multi_resp) \ |
136 | 625 | COUNTER(reconfig_resp) \ |
137 | 625 | COUNTER(close_resp) \ |
138 | 625 | COUNTER(setauth_resp) \ |
139 | 625 | COUNTER(setwatches_resp) \ |
140 | 625 | COUNTER(setwatches2_resp) \ |
141 | 625 | COUNTER(addwatch_resp) \ |
142 | 625 | COUNTER(checkwatches_resp) \ |
143 | 625 | COUNTER(removewatches_resp) \ |
144 | 625 | COUNTER(check_resp) \ |
145 | 625 | COUNTER(watch_event) \ |
146 | 625 | COUNTER(connect_resp_fast) \ |
147 | 625 | COUNTER(ping_resp_fast) \ |
148 | 625 | COUNTER(auth_resp_fast) \ |
149 | 625 | COUNTER(getdata_resp_fast) \ |
150 | 625 | COUNTER(create_resp_fast) \ |
151 | 625 | COUNTER(create2_resp_fast) \ |
152 | 625 | COUNTER(createcontainer_resp_fast) \ |
153 | 625 | COUNTER(createttl_resp_fast) \ |
154 | 625 | COUNTER(setdata_resp_fast) \ |
155 | 625 | COUNTER(getchildren_resp_fast) \ |
156 | 625 | COUNTER(getchildren2_resp_fast) \ |
157 | 625 | COUNTER(getephemerals_resp_fast) \ |
158 | 625 | COUNTER(getallchildrennumber_resp_fast) \ |
159 | 625 | COUNTER(delete_resp_fast) \ |
160 | 625 | COUNTER(exists_resp_fast) \ |
161 | 625 | COUNTER(getacl_resp_fast) \ |
162 | 625 | COUNTER(setacl_resp_fast) \ |
163 | 625 | COUNTER(sync_resp_fast) \ |
164 | 625 | COUNTER(multi_resp_fast) \ |
165 | 625 | COUNTER(reconfig_resp_fast) \ |
166 | 625 | COUNTER(close_resp_fast) \ |
167 | 625 | COUNTER(setauth_resp_fast) \ |
168 | 625 | COUNTER(setwatches_resp_fast) \ |
169 | 625 | COUNTER(setwatches2_resp_fast) \ |
170 | 625 | COUNTER(addwatch_resp_fast) \ |
171 | 625 | COUNTER(checkwatches_resp_fast) \ |
172 | 625 | COUNTER(removewatches_resp_fast) \ |
173 | 625 | COUNTER(check_resp_fast) \ |
174 | 625 | COUNTER(connect_resp_slow) \ |
175 | 625 | COUNTER(ping_resp_slow) \ |
176 | 625 | COUNTER(auth_resp_slow) \ |
177 | 625 | COUNTER(getdata_resp_slow) \ |
178 | 625 | COUNTER(create_resp_slow) \ |
179 | 625 | COUNTER(create2_resp_slow) \ |
180 | 625 | COUNTER(createcontainer_resp_slow) \ |
181 | 625 | COUNTER(createttl_resp_slow) \ |
182 | 625 | COUNTER(setdata_resp_slow) \ |
183 | 625 | COUNTER(getchildren_resp_slow) \ |
184 | 625 | COUNTER(getchildren2_resp_slow) \ |
185 | 625 | COUNTER(getephemerals_resp_slow) \ |
186 | 625 | COUNTER(getallchildrennumber_resp_slow) \ |
187 | 625 | COUNTER(delete_resp_slow) \ |
188 | 625 | COUNTER(exists_resp_slow) \ |
189 | 625 | COUNTER(getacl_resp_slow) \ |
190 | 625 | COUNTER(setacl_resp_slow) \ |
191 | 625 | COUNTER(sync_resp_slow) \ |
192 | 625 | COUNTER(multi_resp_slow) \ |
193 | 625 | COUNTER(reconfig_resp_slow) \ |
194 | 625 | COUNTER(close_resp_slow) \ |
195 | 625 | COUNTER(setauth_resp_slow) \ |
196 | 625 | COUNTER(setwatches_resp_slow) \ |
197 | 625 | COUNTER(setwatches2_resp_slow) \ |
198 | 625 | COUNTER(addwatch_resp_slow) \ |
199 | 625 | COUNTER(checkwatches_resp_slow) \ |
200 | 625 | COUNTER(removewatches_resp_slow) \ |
201 | 625 | COUNTER(check_resp_slow) |
202 | | |
203 | | /** |
204 | | * Struct definition for all ZooKeeper proxy stats. @see stats_macros.h |
205 | | */ |
206 | | struct ZooKeeperProxyStats { |
207 | | ALL_ZOOKEEPER_PROXY_STATS(GENERATE_COUNTER_STRUCT) |
208 | | }; |
209 | | |
210 | | enum class ErrorBudgetResponseType { Fast, Slow, None }; |
211 | | |
212 | | using envoy::extensions::filters::network::zookeeper_proxy::v3::LatencyThresholdOverride; |
213 | | using envoy::extensions::filters::network::zookeeper_proxy::v3::LatencyThresholdOverride_Opcode; |
214 | | using LatencyThresholdOverrideList = Protobuf::RepeatedPtrField<LatencyThresholdOverride>; |
215 | | using LatencyThresholdOverrideMap = absl::flat_hash_map<int32_t, std::chrono::milliseconds>; |
216 | | using OpcodeMap = absl::flat_hash_map<LatencyThresholdOverride_Opcode, int32_t>; |
217 | | |
218 | | /** |
219 | | * Configuration for the ZooKeeper proxy filter. |
220 | | */ |
221 | | class ZooKeeperFilterConfig { |
222 | | public: |
223 | | ZooKeeperFilterConfig(const std::string& stat_prefix, const uint32_t max_packet_bytes, |
224 | | const bool enable_per_opcode_request_bytes_metrics, |
225 | | const bool enable_per_opcode_response_bytes_metrics, |
226 | | const bool enable_latency_threshold_metrics, |
227 | | const std::chrono::milliseconds default_latency_threshold, |
228 | | const LatencyThresholdOverrideList& latency_threshold_overrides, |
229 | | Stats::Scope& scope); |
230 | | |
231 | 0 | const ZooKeeperProxyStats& stats() { return stats_; } |
232 | 624 | uint32_t maxPacketBytes() const { return max_packet_bytes_; } |
233 | | |
234 | | // The OpCodeInfo is created as a public member of ZooKeeperFilterConfig. |
235 | | // Therefore, its lifetime is tied to that of ZooKeeperFilterConfig. |
236 | | // When the ZooKeeperFilterConfig object is destroyed, the OpCodeInfo will be destroyed as well. |
237 | | // The the lifetime of scope is tied to the context passed to network filters to access server |
238 | | // resources. The values of counter elements in OpCodeInfo are used to track total op-code usage, |
239 | | // as well as the StatName under which to collect the latency, fast/slow responses for that |
240 | | // op-code. The latency-name will be joined with the stat_prefix_, which varies per filter |
241 | | // instance. |
242 | | struct OpCodeInfo { |
243 | | Stats::Counter* resp_counter_; |
244 | | Stats::Counter* resp_fast_counter_; |
245 | | Stats::Counter* resp_slow_counter_; |
246 | | Stats::Counter* rq_bytes_counter_; |
247 | | Stats::Counter* resp_bytes_counter_; |
248 | | std::string opname_; |
249 | | Stats::StatName latency_name_; |
250 | | }; |
251 | | |
252 | | absl::flat_hash_map<OpCodes, OpCodeInfo> op_code_map_; |
253 | | Stats::Scope& scope_; |
254 | | const uint32_t max_packet_bytes_; |
255 | | ZooKeeperProxyStats stats_; |
256 | | Stats::StatNameSetPtr stat_name_set_; |
257 | | const Stats::StatName stat_prefix_; |
258 | | const Stats::StatName auth_; |
259 | | const Stats::StatName connect_latency_; |
260 | | const Stats::StatName unknown_scheme_rq_; |
261 | | const Stats::StatName unknown_opcode_latency_; |
262 | | const bool enable_per_opcode_request_bytes_metrics_; |
263 | | const bool enable_per_opcode_response_bytes_metrics_; |
264 | | |
265 | | ErrorBudgetResponseType errorBudgetDecision(const OpCodes opcode, |
266 | | const std::chrono::milliseconds latency) const; |
267 | | |
268 | | private: |
269 | | void initOpCode(OpCodes opcode, Stats::Counter& resp_counter, Stats::Counter& resp_fast_counter, |
270 | | Stats::Counter& resp_slow_counter, Stats::Counter& rq_bytes_counter, |
271 | | Stats::Counter& resp_bytes_counter, absl::string_view name); |
272 | | |
273 | 625 | ZooKeeperProxyStats generateStats(const std::string& prefix, Stats::Scope& scope) { |
274 | 625 | return ZooKeeperProxyStats{ALL_ZOOKEEPER_PROXY_STATS(POOL_COUNTER_PREFIX(scope, prefix))}; |
275 | 625 | } |
276 | | |
277 | 427 | static const OpcodeMap& opcodeMap() { |
278 | 427 | CONSTRUCT_ON_FIRST_USE(OpcodeMap, {{LatencyThresholdOverride::Connect, 0}, |
279 | 427 | {LatencyThresholdOverride::Create, 1}, |
280 | 427 | {LatencyThresholdOverride::Delete, 2}, |
281 | 427 | {LatencyThresholdOverride::Exists, 3}, |
282 | 427 | {LatencyThresholdOverride::GetData, 4}, |
283 | 427 | {LatencyThresholdOverride::SetData, 5}, |
284 | 427 | {LatencyThresholdOverride::GetAcl, 6}, |
285 | 427 | {LatencyThresholdOverride::SetAcl, 7}, |
286 | 427 | {LatencyThresholdOverride::GetChildren, 8}, |
287 | 427 | {LatencyThresholdOverride::Sync, 9}, |
288 | 427 | {LatencyThresholdOverride::Ping, 11}, |
289 | 427 | {LatencyThresholdOverride::GetChildren2, 12}, |
290 | 427 | {LatencyThresholdOverride::Check, 13}, |
291 | 427 | {LatencyThresholdOverride::Multi, 14}, |
292 | 427 | {LatencyThresholdOverride::Create2, 15}, |
293 | 427 | {LatencyThresholdOverride::Reconfig, 16}, |
294 | 427 | {LatencyThresholdOverride::CheckWatches, 17}, |
295 | 427 | {LatencyThresholdOverride::RemoveWatches, 18}, |
296 | 427 | {LatencyThresholdOverride::CreateContainer, 19}, |
297 | 427 | {LatencyThresholdOverride::CreateTtl, 21}, |
298 | 427 | {LatencyThresholdOverride::Close, -11}, |
299 | 427 | {LatencyThresholdOverride::SetAuth, 100}, |
300 | 427 | {LatencyThresholdOverride::SetWatches, 101}, |
301 | 427 | {LatencyThresholdOverride::GetEphemerals, 103}, |
302 | 427 | {LatencyThresholdOverride::GetAllChildrenNumber, 104}, |
303 | 427 | {LatencyThresholdOverride::SetWatches2, 105}, |
304 | 427 | {LatencyThresholdOverride::AddWatch, 106}}); |
305 | 427 | } |
306 | | |
307 | | int32_t getOpCodeIndex(LatencyThresholdOverride_Opcode opcode); |
308 | | LatencyThresholdOverrideMap |
309 | | parseLatencyThresholdOverrides(const LatencyThresholdOverrideList& latency_threshold_overrides); |
310 | | |
311 | | const bool enable_latency_threshold_metrics_; |
312 | | const std::chrono::milliseconds default_latency_threshold_; |
313 | | // Key: opcode enum value defined in decoder.h, value: latency threshold override in millisecond. |
314 | | const LatencyThresholdOverrideMap latency_threshold_override_map_; |
315 | | }; |
316 | | |
317 | | using ZooKeeperFilterConfigSharedPtr = std::shared_ptr<ZooKeeperFilterConfig>; |
318 | | |
319 | | /** |
320 | | * Implementation of ZooKeeper proxy filter. |
321 | | */ |
322 | | class ZooKeeperFilter : public Network::Filter, |
323 | | DecoderCallbacks, |
324 | | Logger::Loggable<Logger::Id::filter> { |
325 | | public: |
326 | | ZooKeeperFilter(ZooKeeperFilterConfigSharedPtr config, TimeSource& time_source); |
327 | | |
328 | | // Network::ReadFilter |
329 | | Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override; |
330 | | Network::FilterStatus onNewConnection() override; |
331 | | void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; |
332 | | |
333 | | // Network::WriteFilter |
334 | | Network::FilterStatus onWrite(Buffer::Instance& data, bool end_stream) override; |
335 | | |
336 | | // ZooKeeperProxy::DecoderCallback |
337 | | void onDecodeError() override; |
338 | | void onRequestBytes(const absl::optional<OpCodes> opcode, const uint64_t bytes) override; |
339 | | void onConnect(bool readonly) override; |
340 | | void onPing() override; |
341 | | void onAuthRequest(const std::string& scheme) override; |
342 | | void onGetDataRequest(const std::string& path, bool watch) override; |
343 | | void onCreateRequest(const std::string& path, CreateFlags flags, OpCodes opcode) override; |
344 | | void onSetRequest(const std::string& path) override; |
345 | | void onGetChildrenRequest(const std::string& path, bool watch, bool v2) override; |
346 | | void onDeleteRequest(const std::string& path, int32_t version) override; |
347 | | void onExistsRequest(const std::string& path, bool watch) override; |
348 | | void onGetAclRequest(const std::string& path) override; |
349 | | void onSetAclRequest(const std::string& path, int32_t version) override; |
350 | | void onSyncRequest(const std::string& path) override; |
351 | | void onCheckRequest(const std::string& path, int32_t version) override; |
352 | | void onMultiRequest() override; |
353 | | void onReconfigRequest() override; |
354 | | void onSetWatchesRequest() override; |
355 | | void onSetWatches2Request() override; |
356 | | void onAddWatchRequest(const std::string& path, const int32_t mode) override; |
357 | | void onCheckWatchesRequest(const std::string& path, int32_t type) override; |
358 | | void onRemoveWatchesRequest(const std::string& path, int32_t type) override; |
359 | | void onGetEphemeralsRequest(const std::string& path) override; |
360 | | void onGetAllChildrenNumberRequest(const std::string& path) override; |
361 | | void onCloseRequest() override; |
362 | | void onResponseBytes(const absl::optional<OpCodes> opcode, const uint64_t bytes) override; |
363 | | void onConnectResponse(int32_t proto_version, int32_t timeout, bool readonly, |
364 | | const std::chrono::milliseconds latency) override; |
365 | | void onResponse(OpCodes opcode, int32_t xid, int64_t zxid, int32_t error, |
366 | | const std::chrono::milliseconds latency) override; |
367 | | void onWatchEvent(int32_t event_type, int32_t client_state, const std::string& path, int64_t zxid, |
368 | | int32_t error) override; |
369 | | |
370 | | DecoderPtr createDecoder(DecoderCallbacks& callbacks, TimeSource& time_source); |
371 | | void setDynamicMetadata(const std::string& key, const std::string& value); |
372 | | void setDynamicMetadata(const std::vector<std::pair<const std::string, const std::string>>& data); |
373 | | void clearDynamicMetadata(); |
374 | | |
375 | | private: |
376 | | Network::ReadFilterCallbacks* read_callbacks_{}; |
377 | | ZooKeeperFilterConfigSharedPtr config_; |
378 | | std::unique_ptr<Decoder> decoder_; |
379 | | }; |
380 | | |
381 | | } // namespace ZooKeeperProxy |
382 | | } // namespace NetworkFilters |
383 | | } // namespace Extensions |
384 | | } // namespace Envoy |