/proc/self/cwd/source/extensions/filters/network/zookeeper_proxy/filter.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/filters/network/zookeeper_proxy/filter.h" |
2 | | |
3 | | #include <string> |
4 | | #include <vector> |
5 | | |
6 | | #include "envoy/config/core/v3/base.pb.h" |
7 | | #include "envoy/extensions/filters/network/zookeeper_proxy/v3/zookeeper_proxy.pb.h" |
8 | | #include "envoy/extensions/filters/network/zookeeper_proxy/v3/zookeeper_proxy.pb.validate.h" |
9 | | |
10 | | #include "source/common/buffer/buffer_impl.h" |
11 | | #include "source/common/common/assert.h" |
12 | | #include "source/common/common/enum_to_int.h" |
13 | | #include "source/common/common/fmt.h" |
14 | | #include "source/common/common/logger.h" |
15 | | #include "source/common/stats/utility.h" |
16 | | #include "source/extensions/filters/network/well_known_names.h" |
17 | | |
18 | | namespace Envoy { |
19 | | namespace Extensions { |
20 | | namespace NetworkFilters { |
21 | | namespace ZooKeeperProxy { |
22 | | |
23 | | ZooKeeperFilterConfig::ZooKeeperFilterConfig( |
24 | | const std::string& stat_prefix, const uint32_t max_packet_bytes, |
25 | | const bool enable_per_opcode_request_bytes_metrics, |
26 | | const bool enable_per_opcode_response_bytes_metrics, |
27 | | const bool enable_latency_threshold_metrics, |
28 | | const std::chrono::milliseconds default_latency_threshold, |
29 | | const LatencyThresholdOverrideList& latency_threshold_overrides, Stats::Scope& scope) |
30 | | : scope_(scope), max_packet_bytes_(max_packet_bytes), stats_(generateStats(stat_prefix, scope)), |
31 | | stat_name_set_(scope.symbolTable().makeSet("Zookeeper")), |
32 | | stat_prefix_(stat_name_set_->add(stat_prefix)), auth_(stat_name_set_->add("auth")), |
33 | | connect_latency_(stat_name_set_->add("connect_response_latency")), |
34 | | unknown_scheme_rq_(stat_name_set_->add("unknown_scheme_rq")), |
35 | | unknown_opcode_latency_(stat_name_set_->add("unknown_opcode_latency")), |
36 | | enable_per_opcode_request_bytes_metrics_(enable_per_opcode_request_bytes_metrics), |
37 | | enable_per_opcode_response_bytes_metrics_(enable_per_opcode_response_bytes_metrics), |
38 | | enable_latency_threshold_metrics_(enable_latency_threshold_metrics), |
39 | | default_latency_threshold_(default_latency_threshold), |
40 | 625 | latency_threshold_override_map_(parseLatencyThresholdOverrides(latency_threshold_overrides)) { |
41 | | // https://zookeeper.apache.org/doc/r3.5.4-beta/zookeeperProgrammers.html#sc_BuiltinACLSchemes |
42 | | // lists commons schemes: "world", "auth", "digest", "host", "x509", and |
43 | | // "ip". These are used in filter.cc by appending "_rq". |
44 | 625 | stat_name_set_->rememberBuiltins( |
45 | 625 | {"auth_rq", "digest_rq", "host_rq", "ip_rq", "ping_response_rq", "world_rq", "x509_rq"}); |
46 | | |
47 | 625 | initOpCode(OpCodes::Ping, stats_.ping_resp_, stats_.ping_resp_fast_, stats_.ping_resp_slow_, |
48 | 625 | stats_.ping_rq_bytes_, stats_.ping_resp_bytes_, "ping_response"); |
49 | 625 | initOpCode(OpCodes::SetAuth, stats_.auth_resp_, stats_.auth_resp_fast_, stats_.auth_resp_slow_, |
50 | 625 | stats_.auth_rq_bytes_, stats_.auth_resp_bytes_, "auth_response"); |
51 | 625 | initOpCode(OpCodes::GetData, stats_.getdata_resp_, stats_.getdata_resp_fast_, |
52 | 625 | stats_.getdata_resp_slow_, stats_.getdata_rq_bytes_, stats_.getdata_resp_bytes_, |
53 | 625 | "getdata_resp"); |
54 | 625 | initOpCode(OpCodes::Create, stats_.create_resp_, stats_.create_resp_fast_, |
55 | 625 | stats_.create_resp_slow_, stats_.create_rq_bytes_, stats_.create_resp_bytes_, |
56 | 625 | "create_resp"); |
57 | 625 | initOpCode(OpCodes::Create2, stats_.create2_resp_, stats_.create2_resp_fast_, |
58 | 625 | stats_.create2_resp_slow_, stats_.create2_rq_bytes_, stats_.create2_resp_bytes_, |
59 | 625 | "create2_resp"); |
60 | 625 | initOpCode(OpCodes::CreateContainer, stats_.createcontainer_resp_, |
61 | 625 | stats_.createcontainer_resp_fast_, stats_.createcontainer_resp_slow_, |
62 | 625 | stats_.createcontainer_rq_bytes_, stats_.createcontainer_resp_bytes_, |
63 | 625 | "createcontainer_resp"); |
64 | 625 | initOpCode(OpCodes::CreateTtl, stats_.createttl_resp_, stats_.createttl_resp_fast_, |
65 | 625 | stats_.createttl_resp_slow_, stats_.createttl_rq_bytes_, stats_.createttl_resp_bytes_, |
66 | 625 | "createttl_resp"); |
67 | 625 | initOpCode(OpCodes::SetData, stats_.setdata_resp_, stats_.setdata_resp_fast_, |
68 | 625 | stats_.setdata_resp_slow_, stats_.setdata_rq_bytes_, stats_.setdata_resp_bytes_, |
69 | 625 | "setdata_resp"); |
70 | 625 | initOpCode(OpCodes::GetChildren, stats_.getchildren_resp_, stats_.getchildren_resp_fast_, |
71 | 625 | stats_.getchildren_resp_slow_, stats_.getchildren_rq_bytes_, |
72 | 625 | stats_.getchildren_resp_bytes_, "getchildren_resp"); |
73 | 625 | initOpCode(OpCodes::GetChildren2, stats_.getchildren2_resp_, stats_.getchildren2_resp_fast_, |
74 | 625 | stats_.getchildren2_resp_slow_, stats_.getchildren2_rq_bytes_, |
75 | 625 | stats_.getchildren2_resp_bytes_, "getchildren2_resp"); |
76 | 625 | initOpCode(OpCodes::Delete, stats_.delete_resp_, stats_.delete_resp_fast_, |
77 | 625 | stats_.delete_resp_slow_, stats_.delete_rq_bytes_, stats_.delete_resp_bytes_, |
78 | 625 | "delete_resp"); |
79 | 625 | initOpCode(OpCodes::Exists, stats_.exists_resp_, stats_.exists_resp_fast_, |
80 | 625 | stats_.exists_resp_slow_, stats_.exists_rq_bytes_, stats_.exists_resp_bytes_, |
81 | 625 | "exists_resp"); |
82 | 625 | initOpCode(OpCodes::GetAcl, stats_.getacl_resp_, stats_.getacl_resp_fast_, |
83 | 625 | stats_.getacl_resp_slow_, stats_.getacl_rq_bytes_, stats_.getacl_resp_bytes_, |
84 | 625 | "getacl_resp"); |
85 | 625 | initOpCode(OpCodes::SetAcl, stats_.setacl_resp_, stats_.setacl_resp_fast_, |
86 | 625 | stats_.setacl_resp_slow_, stats_.setacl_rq_bytes_, stats_.setacl_resp_bytes_, |
87 | 625 | "setacl_resp"); |
88 | 625 | initOpCode(OpCodes::Sync, stats_.sync_resp_, stats_.sync_resp_fast_, stats_.sync_resp_slow_, |
89 | 625 | stats_.sync_rq_bytes_, stats_.sync_resp_bytes_, "sync_resp"); |
90 | 625 | initOpCode(OpCodes::Check, stats_.check_resp_, stats_.check_resp_fast_, stats_.check_resp_slow_, |
91 | 625 | stats_.check_rq_bytes_, stats_.check_resp_bytes_, "check_resp"); |
92 | 625 | initOpCode(OpCodes::Multi, stats_.multi_resp_, stats_.multi_resp_fast_, stats_.multi_resp_slow_, |
93 | 625 | stats_.multi_rq_bytes_, stats_.multi_resp_bytes_, "multi_resp"); |
94 | 625 | initOpCode(OpCodes::Reconfig, stats_.reconfig_resp_, stats_.reconfig_resp_fast_, |
95 | 625 | stats_.reconfig_resp_slow_, stats_.reconfig_rq_bytes_, stats_.reconfig_resp_bytes_, |
96 | 625 | "reconfig_resp"); |
97 | 625 | initOpCode(OpCodes::SetWatches, stats_.setwatches_resp_, stats_.setwatches_resp_fast_, |
98 | 625 | stats_.setwatches_resp_slow_, stats_.setwatches_rq_bytes_, |
99 | 625 | stats_.setwatches_resp_bytes_, "setwatches_resp"); |
100 | 625 | initOpCode(OpCodes::SetWatches2, stats_.setwatches2_resp_, stats_.setwatches2_resp_fast_, |
101 | 625 | stats_.setwatches2_resp_slow_, stats_.setwatches2_rq_bytes_, |
102 | 625 | stats_.setwatches2_resp_bytes_, "setwatches2_resp"); |
103 | 625 | initOpCode(OpCodes::AddWatch, stats_.addwatch_resp_, stats_.addwatch_resp_fast_, |
104 | 625 | stats_.addwatch_resp_slow_, stats_.addwatch_rq_bytes_, stats_.addwatch_resp_bytes_, |
105 | 625 | "addwatch_resp"); |
106 | 625 | initOpCode(OpCodes::CheckWatches, stats_.checkwatches_resp_, stats_.checkwatches_resp_fast_, |
107 | 625 | stats_.checkwatches_resp_slow_, stats_.checkwatches_rq_bytes_, |
108 | 625 | stats_.checkwatches_resp_bytes_, "checkwatches_resp"); |
109 | 625 | initOpCode(OpCodes::RemoveWatches, stats_.removewatches_resp_, stats_.removewatches_resp_fast_, |
110 | 625 | stats_.removewatches_resp_slow_, stats_.removewatches_rq_bytes_, |
111 | 625 | stats_.removewatches_resp_bytes_, "removewatches_resp"); |
112 | 625 | initOpCode(OpCodes::GetEphemerals, stats_.getephemerals_resp_, stats_.getephemerals_resp_fast_, |
113 | 625 | stats_.getephemerals_resp_slow_, stats_.getephemerals_rq_bytes_, |
114 | 625 | stats_.getephemerals_resp_bytes_, "getephemerals_resp"); |
115 | 625 | initOpCode(OpCodes::GetAllChildrenNumber, stats_.getallchildrennumber_resp_, |
116 | 625 | stats_.getallchildrennumber_resp_fast_, stats_.getallchildrennumber_resp_slow_, |
117 | 625 | stats_.getallchildrennumber_rq_bytes_, stats_.getallchildrennumber_resp_bytes_, |
118 | 625 | "getallchildrennumber_resp"); |
119 | 625 | initOpCode(OpCodes::Close, stats_.close_resp_, stats_.close_resp_fast_, stats_.close_resp_slow_, |
120 | 625 | stats_.close_rq_bytes_, stats_.close_resp_bytes_, "close_resp"); |
121 | 625 | } |
122 | | |
123 | | ErrorBudgetResponseType |
124 | | ZooKeeperFilterConfig::errorBudgetDecision(const OpCodes opcode, |
125 | 0 | const std::chrono::milliseconds latency) const { |
126 | 0 | if (!enable_latency_threshold_metrics_) { |
127 | 0 | return ErrorBudgetResponseType::None; |
128 | 0 | } |
129 | | // Set latency threshold for the current opcode. |
130 | 0 | std::chrono::milliseconds latency_threshold = default_latency_threshold_; |
131 | 0 | int32_t opcode_val = enumToSignedInt(opcode); |
132 | 0 | auto it = latency_threshold_override_map_.find(opcode_val); |
133 | 0 | if (it != latency_threshold_override_map_.end()) { |
134 | 0 | latency_threshold = it->second; |
135 | 0 | } |
136 | | |
137 | | // Determine fast/slow response based on the threshold. |
138 | 0 | if (latency <= latency_threshold) { |
139 | 0 | return ErrorBudgetResponseType::Fast; |
140 | 0 | } |
141 | | |
142 | 0 | return ErrorBudgetResponseType::Slow; |
143 | 0 | } |
144 | | |
145 | | void ZooKeeperFilterConfig::initOpCode(OpCodes opcode, Stats::Counter& resp_counter, |
146 | | Stats::Counter& resp_fast_counter, |
147 | | Stats::Counter& resp_slow_counter, |
148 | | Stats::Counter& rq_bytes_counter, |
149 | 16.2k | Stats::Counter& resp_bytes_counter, absl::string_view name) { |
150 | 16.2k | OpCodeInfo& opcode_info = op_code_map_[opcode]; |
151 | 16.2k | opcode_info.resp_counter_ = &resp_counter; |
152 | 16.2k | opcode_info.resp_fast_counter_ = &resp_fast_counter; |
153 | 16.2k | opcode_info.resp_slow_counter_ = &resp_slow_counter; |
154 | 16.2k | opcode_info.rq_bytes_counter_ = &rq_bytes_counter; |
155 | 16.2k | opcode_info.resp_bytes_counter_ = &resp_bytes_counter; |
156 | 16.2k | opcode_info.opname_ = std::string(name); |
157 | 16.2k | opcode_info.latency_name_ = stat_name_set_->add(absl::StrCat(name, "_latency")); |
158 | 16.2k | } |
159 | | |
160 | 427 | int32_t ZooKeeperFilterConfig::getOpCodeIndex(LatencyThresholdOverride_Opcode opcode) { |
161 | 427 | const OpcodeMap& opcode_map = opcodeMap(); |
162 | 427 | auto it = opcode_map.find(opcode); |
163 | 427 | if (it != opcode_map.end()) { |
164 | 427 | return it->second; |
165 | 427 | } |
166 | 0 | throw EnvoyException(fmt::format("Unknown opcode from config: {}", static_cast<int32_t>(opcode))); |
167 | 427 | } |
168 | | |
169 | | LatencyThresholdOverrideMap ZooKeeperFilterConfig::parseLatencyThresholdOverrides( |
170 | 625 | const LatencyThresholdOverrideList& latency_threshold_overrides) { |
171 | 625 | LatencyThresholdOverrideMap latency_threshold_override_map; |
172 | 625 | for (const auto& threshold_override : latency_threshold_overrides) { |
173 | 428 | latency_threshold_override_map[getOpCodeIndex(threshold_override.opcode())] = |
174 | 428 | std::chrono::milliseconds(PROTOBUF_GET_MS_REQUIRED(threshold_override, threshold)); |
175 | 428 | } |
176 | 625 | return latency_threshold_override_map; |
177 | 625 | } |
178 | | |
179 | | ZooKeeperFilter::ZooKeeperFilter(ZooKeeperFilterConfigSharedPtr config, TimeSource& time_source) |
180 | 624 | : config_(std::move(config)), decoder_(createDecoder(*this, time_source)) {} |
181 | | |
182 | 624 | void ZooKeeperFilter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { |
183 | 624 | read_callbacks_ = &callbacks; |
184 | 624 | } |
185 | | |
186 | 0 | Network::FilterStatus ZooKeeperFilter::onData(Buffer::Instance& data, bool) { |
187 | 0 | clearDynamicMetadata(); |
188 | 0 | return decoder_->onData(data); |
189 | 0 | } |
190 | | |
191 | 9.54k | Network::FilterStatus ZooKeeperFilter::onWrite(Buffer::Instance& data, bool) { |
192 | 9.54k | clearDynamicMetadata(); |
193 | 9.54k | return decoder_->onWrite(data); |
194 | 9.54k | } |
195 | | |
196 | 0 | Network::FilterStatus ZooKeeperFilter::onNewConnection() { return Network::FilterStatus::Continue; } |
197 | | |
198 | 624 | DecoderPtr ZooKeeperFilter::createDecoder(DecoderCallbacks& callbacks, TimeSource& time_source) { |
199 | 624 | return std::make_unique<DecoderImpl>(callbacks, config_->maxPacketBytes(), time_source); |
200 | 624 | } |
201 | | |
202 | 6.16k | void ZooKeeperFilter::setDynamicMetadata(const std::string& key, const std::string& value) { |
203 | 6.16k | setDynamicMetadata({{key, value}}); |
204 | 6.16k | } |
205 | | |
206 | 9.54k | void ZooKeeperFilter::clearDynamicMetadata() { |
207 | 9.54k | envoy::config::core::v3::Metadata& dynamic_metadata = |
208 | 9.54k | read_callbacks_->connection().streamInfo().dynamicMetadata(); |
209 | 9.54k | auto& metadata = |
210 | 9.54k | (*dynamic_metadata.mutable_filter_metadata())[NetworkFilterNames::get().ZooKeeperProxy]; |
211 | 9.54k | metadata.mutable_fields()->clear(); |
212 | 9.54k | } |
213 | | |
214 | | void ZooKeeperFilter::setDynamicMetadata( |
215 | 9.14k | const std::vector<std::pair<const std::string, const std::string>>& data) { |
216 | 9.14k | envoy::config::core::v3::Metadata& dynamic_metadata = |
217 | 9.14k | read_callbacks_->connection().streamInfo().dynamicMetadata(); |
218 | 9.14k | ProtobufWkt::Struct metadata( |
219 | 9.14k | (*dynamic_metadata.mutable_filter_metadata())[NetworkFilterNames::get().ZooKeeperProxy]); |
220 | 9.14k | auto& fields = *metadata.mutable_fields(); |
221 | | |
222 | 24.0k | for (const auto& pair : data) { |
223 | 24.0k | auto val = ProtobufWkt::Value(); |
224 | 24.0k | val.set_string_value(pair.second); |
225 | 24.0k | fields.insert({pair.first, val}); |
226 | 24.0k | } |
227 | | |
228 | 9.14k | read_callbacks_->connection().streamInfo().setDynamicMetadata( |
229 | 9.14k | NetworkFilterNames::get().ZooKeeperProxy, metadata); |
230 | 9.14k | } |
231 | | |
232 | 0 | void ZooKeeperFilter::onConnect(const bool readonly) { |
233 | 0 | if (readonly) { |
234 | 0 | config_->stats_.connect_readonly_rq_.inc(); |
235 | 0 | setDynamicMetadata("opname", "connect_readonly"); |
236 | 0 | } else { |
237 | 0 | config_->stats_.connect_rq_.inc(); |
238 | 0 | setDynamicMetadata("opname", "connect"); |
239 | 0 | } |
240 | 0 | } |
241 | | |
242 | 3.18k | void ZooKeeperFilter::onDecodeError() { |
243 | 3.18k | config_->stats_.decoder_error_.inc(); |
244 | 3.18k | setDynamicMetadata("opname", "error"); |
245 | 3.18k | } |
246 | | |
247 | 0 | void ZooKeeperFilter::onRequestBytes(const absl::optional<OpCodes> opcode, const uint64_t bytes) { |
248 | 0 | config_->stats_.request_bytes_.add(bytes); |
249 | |
|
250 | 0 | if (config_->enable_per_opcode_request_bytes_metrics_ && opcode.has_value()) { |
251 | 0 | if (*opcode == OpCodes::Connect) { |
252 | 0 | config_->stats_.connect_rq_bytes_.add(bytes); |
253 | 0 | } else { |
254 | 0 | ASSERT(config_->op_code_map_.contains(*opcode)); |
255 | 0 | config_->op_code_map_[*opcode].rq_bytes_counter_->add(bytes); |
256 | 0 | } |
257 | 0 | } |
258 | | |
259 | 0 | setDynamicMetadata("bytes", std::to_string(bytes)); |
260 | 0 | } |
261 | | |
262 | 2.98k | void ZooKeeperFilter::onResponseBytes(const absl::optional<OpCodes> opcode, const uint64_t bytes) { |
263 | 2.98k | config_->stats_.response_bytes_.add(bytes); |
264 | | |
265 | 2.98k | if (config_->enable_per_opcode_response_bytes_metrics_ && opcode.has_value()) { |
266 | 0 | if (*opcode == OpCodes::Connect) { |
267 | 0 | config_->stats_.connect_resp_bytes_.add(bytes); |
268 | 0 | } else { |
269 | 0 | ASSERT(config_->op_code_map_.contains(*opcode)); |
270 | 0 | config_->op_code_map_[*opcode].resp_bytes_counter_->add(bytes); |
271 | 0 | } |
272 | 0 | } |
273 | | |
274 | 2.98k | setDynamicMetadata("bytes", std::to_string(bytes)); |
275 | 2.98k | } |
276 | | |
277 | 0 | void ZooKeeperFilter::onPing() { |
278 | 0 | config_->stats_.ping_rq_.inc(); |
279 | 0 | setDynamicMetadata("opname", "ping"); |
280 | 0 | } |
281 | | |
282 | 0 | void ZooKeeperFilter::onAuthRequest(const std::string& scheme) { |
283 | 0 | Stats::Counter& counter = Stats::Utility::counterFromStatNames( |
284 | 0 | config_->scope_, {config_->stat_prefix_, config_->auth_, |
285 | 0 | config_->stat_name_set_->getBuiltin(absl::StrCat(scheme, "_rq"), |
286 | 0 | config_->unknown_scheme_rq_)}); |
287 | 0 | counter.inc(); |
288 | 0 | setDynamicMetadata("opname", "auth"); |
289 | 0 | } |
290 | | |
291 | 0 | void ZooKeeperFilter::onGetDataRequest(const std::string& path, const bool watch) { |
292 | 0 | config_->stats_.getdata_rq_.inc(); |
293 | 0 | setDynamicMetadata({{"opname", "getdata"}, {"path", path}, {"watch", watch ? "true" : "false"}}); |
294 | 0 | } |
295 | | |
296 | | void ZooKeeperFilter::onCreateRequest(const std::string& path, const CreateFlags flags, |
297 | 0 | const OpCodes opcode) { |
298 | 0 | std::string opname; |
299 | |
|
300 | 0 | switch (opcode) { |
301 | 0 | case OpCodes::Create: |
302 | 0 | opname = "create"; |
303 | 0 | config_->stats_.create_rq_.inc(); |
304 | 0 | break; |
305 | 0 | case OpCodes::Create2: |
306 | 0 | opname = "create2"; |
307 | 0 | config_->stats_.create2_rq_.inc(); |
308 | 0 | break; |
309 | 0 | case OpCodes::CreateContainer: |
310 | 0 | opname = "createcontainer"; |
311 | 0 | config_->stats_.createcontainer_rq_.inc(); |
312 | 0 | break; |
313 | 0 | case OpCodes::CreateTtl: |
314 | 0 | opname = "createttl"; |
315 | 0 | config_->stats_.createttl_rq_.inc(); |
316 | 0 | break; |
317 | 0 | default: |
318 | 0 | throw EnvoyException(fmt::format("Unknown opcode: {}", enumToSignedInt(opcode))); |
319 | 0 | break; |
320 | 0 | } |
321 | | |
322 | 0 | setDynamicMetadata( |
323 | 0 | {{"opname", opname}, {"path", path}, {"create_type", createFlagsToString(flags)}}); |
324 | 0 | } |
325 | | |
326 | 0 | void ZooKeeperFilter::onSetRequest(const std::string& path) { |
327 | 0 | config_->stats_.setdata_rq_.inc(); |
328 | 0 | setDynamicMetadata({{"opname", "setdata"}, {"path", path}}); |
329 | 0 | } |
330 | | |
331 | | void ZooKeeperFilter::onGetChildrenRequest(const std::string& path, const bool watch, |
332 | 0 | const bool v2) { |
333 | 0 | std::string opname = "getchildren"; |
334 | |
|
335 | 0 | if (v2) { |
336 | 0 | config_->stats_.getchildren2_rq_.inc(); |
337 | 0 | opname = "getchildren2"; |
338 | 0 | } else { |
339 | 0 | config_->stats_.getchildren_rq_.inc(); |
340 | 0 | } |
341 | |
|
342 | 0 | setDynamicMetadata({{"opname", opname}, {"path", path}, {"watch", watch ? "true" : "false"}}); |
343 | 0 | } |
344 | | |
345 | 0 | void ZooKeeperFilter::onDeleteRequest(const std::string& path, const int32_t version) { |
346 | 0 | config_->stats_.delete_rq_.inc(); |
347 | 0 | setDynamicMetadata({{"opname", "delete"}, {"path", path}, {"version", std::to_string(version)}}); |
348 | 0 | } |
349 | | |
350 | 0 | void ZooKeeperFilter::onExistsRequest(const std::string& path, const bool watch) { |
351 | 0 | config_->stats_.exists_rq_.inc(); |
352 | 0 | setDynamicMetadata({{"opname", "exists"}, {"path", path}, {"watch", watch ? "true" : "false"}}); |
353 | 0 | } |
354 | | |
355 | 0 | void ZooKeeperFilter::onGetAclRequest(const std::string& path) { |
356 | 0 | config_->stats_.getacl_rq_.inc(); |
357 | 0 | setDynamicMetadata({{"opname", "getacl"}, {"path", path}}); |
358 | 0 | } |
359 | | |
360 | 0 | void ZooKeeperFilter::onSetAclRequest(const std::string& path, const int32_t version) { |
361 | 0 | config_->stats_.setacl_rq_.inc(); |
362 | 0 | setDynamicMetadata({{"opname", "setacl"}, {"path", path}, {"version", std::to_string(version)}}); |
363 | 0 | } |
364 | | |
365 | 0 | void ZooKeeperFilter::onSyncRequest(const std::string& path) { |
366 | 0 | config_->stats_.sync_rq_.inc(); |
367 | 0 | setDynamicMetadata({{"opname", "sync"}, {"path", path}}); |
368 | 0 | } |
369 | | |
370 | 0 | void ZooKeeperFilter::onCheckRequest(const std::string&, const int32_t) { |
371 | 0 | config_->stats_.check_rq_.inc(); |
372 | 0 | } |
373 | | |
374 | 0 | void ZooKeeperFilter::onCheckWatchesRequest(const std::string& path, const int32_t) { |
375 | 0 | config_->stats_.checkwatches_rq_.inc(); |
376 | 0 | setDynamicMetadata({{"opname", "checkwatches"}, {"path", path}}); |
377 | 0 | } |
378 | | |
379 | 0 | void ZooKeeperFilter::onRemoveWatchesRequest(const std::string& path, const int32_t) { |
380 | 0 | config_->stats_.removewatches_rq_.inc(); |
381 | 0 | setDynamicMetadata({{"opname", "removewatches"}, {"path", path}}); |
382 | 0 | } |
383 | | |
384 | 0 | void ZooKeeperFilter::onMultiRequest() { |
385 | 0 | config_->stats_.multi_rq_.inc(); |
386 | 0 | setDynamicMetadata("opname", "multi"); |
387 | 0 | } |
388 | | |
389 | 0 | void ZooKeeperFilter::onReconfigRequest() { |
390 | 0 | config_->stats_.reconfig_rq_.inc(); |
391 | 0 | setDynamicMetadata("opname", "reconfig"); |
392 | 0 | } |
393 | | |
394 | 0 | void ZooKeeperFilter::onSetWatchesRequest() { |
395 | 0 | config_->stats_.setwatches_rq_.inc(); |
396 | 0 | setDynamicMetadata("opname", "setwatches"); |
397 | 0 | } |
398 | | |
399 | 0 | void ZooKeeperFilter::onSetWatches2Request() { |
400 | 0 | config_->stats_.setwatches2_rq_.inc(); |
401 | 0 | setDynamicMetadata("opname", "setwatches2"); |
402 | 0 | } |
403 | | |
404 | 0 | void ZooKeeperFilter::onAddWatchRequest(const std::string& path, const int32_t mode) { |
405 | 0 | config_->stats_.addwatch_rq_.inc(); |
406 | 0 | setDynamicMetadata({{"opname", "addwatch"}, {"path", path}, {"mode", std::to_string(mode)}}); |
407 | 0 | } |
408 | | |
409 | 0 | void ZooKeeperFilter::onGetEphemeralsRequest(const std::string& path) { |
410 | 0 | config_->stats_.getephemerals_rq_.inc(); |
411 | 0 | setDynamicMetadata({{"opname", "getephemerals"}, {"path", path}}); |
412 | 0 | } |
413 | | |
414 | 0 | void ZooKeeperFilter::onGetAllChildrenNumberRequest(const std::string& path) { |
415 | 0 | config_->stats_.getallchildrennumber_rq_.inc(); |
416 | 0 | setDynamicMetadata({{"opname", "getallchildrennumber"}, {"path", path}}); |
417 | 0 | } |
418 | | |
419 | 0 | void ZooKeeperFilter::onCloseRequest() { |
420 | 0 | config_->stats_.close_rq_.inc(); |
421 | 0 | setDynamicMetadata("opname", "close"); |
422 | 0 | } |
423 | | |
424 | | void ZooKeeperFilter::onConnectResponse(const int32_t proto_version, const int32_t timeout, |
425 | | const bool readonly, |
426 | 0 | const std::chrono::milliseconds latency) { |
427 | 0 | config_->stats_.connect_resp_.inc(); |
428 | |
|
429 | 0 | switch (config_->errorBudgetDecision(OpCodes::Connect, latency)) { |
430 | 0 | case ErrorBudgetResponseType::Fast: |
431 | 0 | config_->stats_.connect_resp_fast_.inc(); |
432 | 0 | break; |
433 | 0 | case ErrorBudgetResponseType::Slow: |
434 | 0 | config_->stats_.connect_resp_slow_.inc(); |
435 | 0 | break; |
436 | 0 | case ErrorBudgetResponseType::None: |
437 | 0 | break; |
438 | 0 | } |
439 | | |
440 | 0 | Stats::Histogram& histogram = Stats::Utility::histogramFromElements( |
441 | 0 | config_->scope_, {config_->stat_prefix_, config_->connect_latency_}, |
442 | 0 | Stats::Histogram::Unit::Milliseconds); |
443 | 0 | histogram.recordValue(latency.count()); |
444 | |
|
445 | 0 | setDynamicMetadata({{"opname", "connect_response"}, |
446 | 0 | {"protocol_version", std::to_string(proto_version)}, |
447 | 0 | {"timeout", std::to_string(timeout)}, |
448 | 0 | {"readonly", std::to_string(readonly)}}); |
449 | 0 | } |
450 | | |
451 | | void ZooKeeperFilter::onResponse(const OpCodes opcode, const int32_t xid, const int64_t zxid, |
452 | 0 | const int32_t error, const std::chrono::milliseconds latency) { |
453 | 0 | Stats::StatName opcode_latency = config_->unknown_opcode_latency_; |
454 | 0 | std::string opname = ""; |
455 | 0 | auto iter = config_->op_code_map_.find(opcode); |
456 | 0 | if (iter != config_->op_code_map_.end()) { |
457 | 0 | const ZooKeeperFilterConfig::OpCodeInfo& opcode_info = iter->second; |
458 | 0 | opcode_info.resp_counter_->inc(); |
459 | 0 | opcode_latency = opcode_info.latency_name_; |
460 | 0 | opname = opcode_info.opname_; |
461 | |
|
462 | 0 | switch (config_->errorBudgetDecision(opcode, latency)) { |
463 | 0 | case ErrorBudgetResponseType::Fast: |
464 | 0 | opcode_info.resp_fast_counter_->inc(); |
465 | 0 | break; |
466 | 0 | case ErrorBudgetResponseType::Slow: |
467 | 0 | opcode_info.resp_slow_counter_->inc(); |
468 | 0 | break; |
469 | 0 | case ErrorBudgetResponseType::None: |
470 | 0 | break; |
471 | 0 | } |
472 | 0 | } |
473 | | |
474 | 0 | Stats::Histogram& histogram = Stats::Utility::histogramFromStatNames( |
475 | 0 | config_->scope_, {config_->stat_prefix_, opcode_latency}, |
476 | 0 | Stats::Histogram::Unit::Milliseconds); |
477 | 0 | histogram.recordValue(latency.count()); |
478 | |
|
479 | 0 | setDynamicMetadata({{"opname", opname}, |
480 | 0 | {"xid", std::to_string(xid)}, |
481 | 0 | {"zxid", std::to_string(zxid)}, |
482 | 0 | {"error", std::to_string(error)}}); |
483 | 0 | } |
484 | | |
485 | | void ZooKeeperFilter::onWatchEvent(const int32_t event_type, const int32_t client_state, |
486 | | const std::string& path, const int64_t zxid, |
487 | 2.98k | const int32_t error) { |
488 | 2.98k | config_->stats_.watch_event_.inc(); |
489 | 2.98k | setDynamicMetadata({{"opname", "watch_event"}, |
490 | 2.98k | {"event_type", std::to_string(event_type)}, |
491 | 2.98k | {"client_state", std::to_string(client_state)}, |
492 | 2.98k | {"path", path}, |
493 | 2.98k | {"zxid", std::to_string(zxid)}, |
494 | 2.98k | {"error", std::to_string(error)}}); |
495 | 2.98k | } |
496 | | |
497 | | } // namespace ZooKeeperProxy |
498 | | } // namespace NetworkFilters |
499 | | } // namespace Extensions |
500 | | } // namespace Envoy |