Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/filters/network/zookeeper_proxy/filter.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/filters/network/zookeeper_proxy/filter.h"
2
3
#include <string>
4
#include <vector>
5
6
#include "envoy/config/core/v3/base.pb.h"
7
#include "envoy/extensions/filters/network/zookeeper_proxy/v3/zookeeper_proxy.pb.h"
8
#include "envoy/extensions/filters/network/zookeeper_proxy/v3/zookeeper_proxy.pb.validate.h"
9
10
#include "source/common/buffer/buffer_impl.h"
11
#include "source/common/common/assert.h"
12
#include "source/common/common/enum_to_int.h"
13
#include "source/common/common/fmt.h"
14
#include "source/common/common/logger.h"
15
#include "source/common/stats/utility.h"
16
#include "source/extensions/filters/network/well_known_names.h"
17
18
namespace Envoy {
19
namespace Extensions {
20
namespace NetworkFilters {
21
namespace ZooKeeperProxy {
22
23
ZooKeeperFilterConfig::ZooKeeperFilterConfig(
24
    const std::string& stat_prefix, const uint32_t max_packet_bytes,
25
    const bool enable_per_opcode_request_bytes_metrics,
26
    const bool enable_per_opcode_response_bytes_metrics,
27
    const bool enable_latency_threshold_metrics,
28
    const std::chrono::milliseconds default_latency_threshold,
29
    const LatencyThresholdOverrideList& latency_threshold_overrides, Stats::Scope& scope)
30
    : scope_(scope), max_packet_bytes_(max_packet_bytes), stats_(generateStats(stat_prefix, scope)),
31
      stat_name_set_(scope.symbolTable().makeSet("Zookeeper")),
32
      stat_prefix_(stat_name_set_->add(stat_prefix)), auth_(stat_name_set_->add("auth")),
33
      connect_latency_(stat_name_set_->add("connect_response_latency")),
34
      unknown_scheme_rq_(stat_name_set_->add("unknown_scheme_rq")),
35
      unknown_opcode_latency_(stat_name_set_->add("unknown_opcode_latency")),
36
      enable_per_opcode_request_bytes_metrics_(enable_per_opcode_request_bytes_metrics),
37
      enable_per_opcode_response_bytes_metrics_(enable_per_opcode_response_bytes_metrics),
38
      enable_latency_threshold_metrics_(enable_latency_threshold_metrics),
39
      default_latency_threshold_(default_latency_threshold),
40
625
      latency_threshold_override_map_(parseLatencyThresholdOverrides(latency_threshold_overrides)) {
41
  // https://zookeeper.apache.org/doc/r3.5.4-beta/zookeeperProgrammers.html#sc_BuiltinACLSchemes
42
  // lists commons schemes: "world", "auth", "digest", "host", "x509", and
43
  // "ip". These are used in filter.cc by appending "_rq".
44
625
  stat_name_set_->rememberBuiltins(
45
625
      {"auth_rq", "digest_rq", "host_rq", "ip_rq", "ping_response_rq", "world_rq", "x509_rq"});
46
47
625
  initOpCode(OpCodes::Ping, stats_.ping_resp_, stats_.ping_resp_fast_, stats_.ping_resp_slow_,
48
625
             stats_.ping_rq_bytes_, stats_.ping_resp_bytes_, "ping_response");
49
625
  initOpCode(OpCodes::SetAuth, stats_.auth_resp_, stats_.auth_resp_fast_, stats_.auth_resp_slow_,
50
625
             stats_.auth_rq_bytes_, stats_.auth_resp_bytes_, "auth_response");
51
625
  initOpCode(OpCodes::GetData, stats_.getdata_resp_, stats_.getdata_resp_fast_,
52
625
             stats_.getdata_resp_slow_, stats_.getdata_rq_bytes_, stats_.getdata_resp_bytes_,
53
625
             "getdata_resp");
54
625
  initOpCode(OpCodes::Create, stats_.create_resp_, stats_.create_resp_fast_,
55
625
             stats_.create_resp_slow_, stats_.create_rq_bytes_, stats_.create_resp_bytes_,
56
625
             "create_resp");
57
625
  initOpCode(OpCodes::Create2, stats_.create2_resp_, stats_.create2_resp_fast_,
58
625
             stats_.create2_resp_slow_, stats_.create2_rq_bytes_, stats_.create2_resp_bytes_,
59
625
             "create2_resp");
60
625
  initOpCode(OpCodes::CreateContainer, stats_.createcontainer_resp_,
61
625
             stats_.createcontainer_resp_fast_, stats_.createcontainer_resp_slow_,
62
625
             stats_.createcontainer_rq_bytes_, stats_.createcontainer_resp_bytes_,
63
625
             "createcontainer_resp");
64
625
  initOpCode(OpCodes::CreateTtl, stats_.createttl_resp_, stats_.createttl_resp_fast_,
65
625
             stats_.createttl_resp_slow_, stats_.createttl_rq_bytes_, stats_.createttl_resp_bytes_,
66
625
             "createttl_resp");
67
625
  initOpCode(OpCodes::SetData, stats_.setdata_resp_, stats_.setdata_resp_fast_,
68
625
             stats_.setdata_resp_slow_, stats_.setdata_rq_bytes_, stats_.setdata_resp_bytes_,
69
625
             "setdata_resp");
70
625
  initOpCode(OpCodes::GetChildren, stats_.getchildren_resp_, stats_.getchildren_resp_fast_,
71
625
             stats_.getchildren_resp_slow_, stats_.getchildren_rq_bytes_,
72
625
             stats_.getchildren_resp_bytes_, "getchildren_resp");
73
625
  initOpCode(OpCodes::GetChildren2, stats_.getchildren2_resp_, stats_.getchildren2_resp_fast_,
74
625
             stats_.getchildren2_resp_slow_, stats_.getchildren2_rq_bytes_,
75
625
             stats_.getchildren2_resp_bytes_, "getchildren2_resp");
76
625
  initOpCode(OpCodes::Delete, stats_.delete_resp_, stats_.delete_resp_fast_,
77
625
             stats_.delete_resp_slow_, stats_.delete_rq_bytes_, stats_.delete_resp_bytes_,
78
625
             "delete_resp");
79
625
  initOpCode(OpCodes::Exists, stats_.exists_resp_, stats_.exists_resp_fast_,
80
625
             stats_.exists_resp_slow_, stats_.exists_rq_bytes_, stats_.exists_resp_bytes_,
81
625
             "exists_resp");
82
625
  initOpCode(OpCodes::GetAcl, stats_.getacl_resp_, stats_.getacl_resp_fast_,
83
625
             stats_.getacl_resp_slow_, stats_.getacl_rq_bytes_, stats_.getacl_resp_bytes_,
84
625
             "getacl_resp");
85
625
  initOpCode(OpCodes::SetAcl, stats_.setacl_resp_, stats_.setacl_resp_fast_,
86
625
             stats_.setacl_resp_slow_, stats_.setacl_rq_bytes_, stats_.setacl_resp_bytes_,
87
625
             "setacl_resp");
88
625
  initOpCode(OpCodes::Sync, stats_.sync_resp_, stats_.sync_resp_fast_, stats_.sync_resp_slow_,
89
625
             stats_.sync_rq_bytes_, stats_.sync_resp_bytes_, "sync_resp");
90
625
  initOpCode(OpCodes::Check, stats_.check_resp_, stats_.check_resp_fast_, stats_.check_resp_slow_,
91
625
             stats_.check_rq_bytes_, stats_.check_resp_bytes_, "check_resp");
92
625
  initOpCode(OpCodes::Multi, stats_.multi_resp_, stats_.multi_resp_fast_, stats_.multi_resp_slow_,
93
625
             stats_.multi_rq_bytes_, stats_.multi_resp_bytes_, "multi_resp");
94
625
  initOpCode(OpCodes::Reconfig, stats_.reconfig_resp_, stats_.reconfig_resp_fast_,
95
625
             stats_.reconfig_resp_slow_, stats_.reconfig_rq_bytes_, stats_.reconfig_resp_bytes_,
96
625
             "reconfig_resp");
97
625
  initOpCode(OpCodes::SetWatches, stats_.setwatches_resp_, stats_.setwatches_resp_fast_,
98
625
             stats_.setwatches_resp_slow_, stats_.setwatches_rq_bytes_,
99
625
             stats_.setwatches_resp_bytes_, "setwatches_resp");
100
625
  initOpCode(OpCodes::SetWatches2, stats_.setwatches2_resp_, stats_.setwatches2_resp_fast_,
101
625
             stats_.setwatches2_resp_slow_, stats_.setwatches2_rq_bytes_,
102
625
             stats_.setwatches2_resp_bytes_, "setwatches2_resp");
103
625
  initOpCode(OpCodes::AddWatch, stats_.addwatch_resp_, stats_.addwatch_resp_fast_,
104
625
             stats_.addwatch_resp_slow_, stats_.addwatch_rq_bytes_, stats_.addwatch_resp_bytes_,
105
625
             "addwatch_resp");
106
625
  initOpCode(OpCodes::CheckWatches, stats_.checkwatches_resp_, stats_.checkwatches_resp_fast_,
107
625
             stats_.checkwatches_resp_slow_, stats_.checkwatches_rq_bytes_,
108
625
             stats_.checkwatches_resp_bytes_, "checkwatches_resp");
109
625
  initOpCode(OpCodes::RemoveWatches, stats_.removewatches_resp_, stats_.removewatches_resp_fast_,
110
625
             stats_.removewatches_resp_slow_, stats_.removewatches_rq_bytes_,
111
625
             stats_.removewatches_resp_bytes_, "removewatches_resp");
112
625
  initOpCode(OpCodes::GetEphemerals, stats_.getephemerals_resp_, stats_.getephemerals_resp_fast_,
113
625
             stats_.getephemerals_resp_slow_, stats_.getephemerals_rq_bytes_,
114
625
             stats_.getephemerals_resp_bytes_, "getephemerals_resp");
115
625
  initOpCode(OpCodes::GetAllChildrenNumber, stats_.getallchildrennumber_resp_,
116
625
             stats_.getallchildrennumber_resp_fast_, stats_.getallchildrennumber_resp_slow_,
117
625
             stats_.getallchildrennumber_rq_bytes_, stats_.getallchildrennumber_resp_bytes_,
118
625
             "getallchildrennumber_resp");
119
625
  initOpCode(OpCodes::Close, stats_.close_resp_, stats_.close_resp_fast_, stats_.close_resp_slow_,
120
625
             stats_.close_rq_bytes_, stats_.close_resp_bytes_, "close_resp");
121
625
}
122
123
ErrorBudgetResponseType
124
ZooKeeperFilterConfig::errorBudgetDecision(const OpCodes opcode,
125
0
                                           const std::chrono::milliseconds latency) const {
126
0
  if (!enable_latency_threshold_metrics_) {
127
0
    return ErrorBudgetResponseType::None;
128
0
  }
129
  // Set latency threshold for the current opcode.
130
0
  std::chrono::milliseconds latency_threshold = default_latency_threshold_;
131
0
  int32_t opcode_val = enumToSignedInt(opcode);
132
0
  auto it = latency_threshold_override_map_.find(opcode_val);
133
0
  if (it != latency_threshold_override_map_.end()) {
134
0
    latency_threshold = it->second;
135
0
  }
136
137
  // Determine fast/slow response based on the threshold.
138
0
  if (latency <= latency_threshold) {
139
0
    return ErrorBudgetResponseType::Fast;
140
0
  }
141
142
0
  return ErrorBudgetResponseType::Slow;
143
0
}
144
145
void ZooKeeperFilterConfig::initOpCode(OpCodes opcode, Stats::Counter& resp_counter,
146
                                       Stats::Counter& resp_fast_counter,
147
                                       Stats::Counter& resp_slow_counter,
148
                                       Stats::Counter& rq_bytes_counter,
149
16.2k
                                       Stats::Counter& resp_bytes_counter, absl::string_view name) {
150
16.2k
  OpCodeInfo& opcode_info = op_code_map_[opcode];
151
16.2k
  opcode_info.resp_counter_ = &resp_counter;
152
16.2k
  opcode_info.resp_fast_counter_ = &resp_fast_counter;
153
16.2k
  opcode_info.resp_slow_counter_ = &resp_slow_counter;
154
16.2k
  opcode_info.rq_bytes_counter_ = &rq_bytes_counter;
155
16.2k
  opcode_info.resp_bytes_counter_ = &resp_bytes_counter;
156
16.2k
  opcode_info.opname_ = std::string(name);
157
16.2k
  opcode_info.latency_name_ = stat_name_set_->add(absl::StrCat(name, "_latency"));
158
16.2k
}
159
160
427
int32_t ZooKeeperFilterConfig::getOpCodeIndex(LatencyThresholdOverride_Opcode opcode) {
161
427
  const OpcodeMap& opcode_map = opcodeMap();
162
427
  auto it = opcode_map.find(opcode);
163
427
  if (it != opcode_map.end()) {
164
427
    return it->second;
165
427
  }
166
0
  throw EnvoyException(fmt::format("Unknown opcode from config: {}", static_cast<int32_t>(opcode)));
167
427
}
168
169
LatencyThresholdOverrideMap ZooKeeperFilterConfig::parseLatencyThresholdOverrides(
170
625
    const LatencyThresholdOverrideList& latency_threshold_overrides) {
171
625
  LatencyThresholdOverrideMap latency_threshold_override_map;
172
625
  for (const auto& threshold_override : latency_threshold_overrides) {
173
428
    latency_threshold_override_map[getOpCodeIndex(threshold_override.opcode())] =
174
428
        std::chrono::milliseconds(PROTOBUF_GET_MS_REQUIRED(threshold_override, threshold));
175
428
  }
176
625
  return latency_threshold_override_map;
177
625
}
178
179
ZooKeeperFilter::ZooKeeperFilter(ZooKeeperFilterConfigSharedPtr config, TimeSource& time_source)
180
624
    : config_(std::move(config)), decoder_(createDecoder(*this, time_source)) {}
181
182
624
void ZooKeeperFilter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
183
624
  read_callbacks_ = &callbacks;
184
624
}
185
186
0
Network::FilterStatus ZooKeeperFilter::onData(Buffer::Instance& data, bool) {
187
0
  clearDynamicMetadata();
188
0
  return decoder_->onData(data);
189
0
}
190
191
9.54k
Network::FilterStatus ZooKeeperFilter::onWrite(Buffer::Instance& data, bool) {
192
9.54k
  clearDynamicMetadata();
193
9.54k
  return decoder_->onWrite(data);
194
9.54k
}
195
196
0
Network::FilterStatus ZooKeeperFilter::onNewConnection() { return Network::FilterStatus::Continue; }
197
198
624
DecoderPtr ZooKeeperFilter::createDecoder(DecoderCallbacks& callbacks, TimeSource& time_source) {
199
624
  return std::make_unique<DecoderImpl>(callbacks, config_->maxPacketBytes(), time_source);
200
624
}
201
202
6.16k
void ZooKeeperFilter::setDynamicMetadata(const std::string& key, const std::string& value) {
203
6.16k
  setDynamicMetadata({{key, value}});
204
6.16k
}
205
206
9.54k
void ZooKeeperFilter::clearDynamicMetadata() {
207
9.54k
  envoy::config::core::v3::Metadata& dynamic_metadata =
208
9.54k
      read_callbacks_->connection().streamInfo().dynamicMetadata();
209
9.54k
  auto& metadata =
210
9.54k
      (*dynamic_metadata.mutable_filter_metadata())[NetworkFilterNames::get().ZooKeeperProxy];
211
9.54k
  metadata.mutable_fields()->clear();
212
9.54k
}
213
214
void ZooKeeperFilter::setDynamicMetadata(
215
9.14k
    const std::vector<std::pair<const std::string, const std::string>>& data) {
216
9.14k
  envoy::config::core::v3::Metadata& dynamic_metadata =
217
9.14k
      read_callbacks_->connection().streamInfo().dynamicMetadata();
218
9.14k
  ProtobufWkt::Struct metadata(
219
9.14k
      (*dynamic_metadata.mutable_filter_metadata())[NetworkFilterNames::get().ZooKeeperProxy]);
220
9.14k
  auto& fields = *metadata.mutable_fields();
221
222
24.0k
  for (const auto& pair : data) {
223
24.0k
    auto val = ProtobufWkt::Value();
224
24.0k
    val.set_string_value(pair.second);
225
24.0k
    fields.insert({pair.first, val});
226
24.0k
  }
227
228
9.14k
  read_callbacks_->connection().streamInfo().setDynamicMetadata(
229
9.14k
      NetworkFilterNames::get().ZooKeeperProxy, metadata);
230
9.14k
}
231
232
0
void ZooKeeperFilter::onConnect(const bool readonly) {
233
0
  if (readonly) {
234
0
    config_->stats_.connect_readonly_rq_.inc();
235
0
    setDynamicMetadata("opname", "connect_readonly");
236
0
  } else {
237
0
    config_->stats_.connect_rq_.inc();
238
0
    setDynamicMetadata("opname", "connect");
239
0
  }
240
0
}
241
242
3.18k
void ZooKeeperFilter::onDecodeError() {
243
3.18k
  config_->stats_.decoder_error_.inc();
244
3.18k
  setDynamicMetadata("opname", "error");
245
3.18k
}
246
247
0
void ZooKeeperFilter::onRequestBytes(const absl::optional<OpCodes> opcode, const uint64_t bytes) {
248
0
  config_->stats_.request_bytes_.add(bytes);
249
250
0
  if (config_->enable_per_opcode_request_bytes_metrics_ && opcode.has_value()) {
251
0
    if (*opcode == OpCodes::Connect) {
252
0
      config_->stats_.connect_rq_bytes_.add(bytes);
253
0
    } else {
254
0
      ASSERT(config_->op_code_map_.contains(*opcode));
255
0
      config_->op_code_map_[*opcode].rq_bytes_counter_->add(bytes);
256
0
    }
257
0
  }
258
259
0
  setDynamicMetadata("bytes", std::to_string(bytes));
260
0
}
261
262
2.98k
void ZooKeeperFilter::onResponseBytes(const absl::optional<OpCodes> opcode, const uint64_t bytes) {
263
2.98k
  config_->stats_.response_bytes_.add(bytes);
264
265
2.98k
  if (config_->enable_per_opcode_response_bytes_metrics_ && opcode.has_value()) {
266
0
    if (*opcode == OpCodes::Connect) {
267
0
      config_->stats_.connect_resp_bytes_.add(bytes);
268
0
    } else {
269
0
      ASSERT(config_->op_code_map_.contains(*opcode));
270
0
      config_->op_code_map_[*opcode].resp_bytes_counter_->add(bytes);
271
0
    }
272
0
  }
273
274
2.98k
  setDynamicMetadata("bytes", std::to_string(bytes));
275
2.98k
}
276
277
0
void ZooKeeperFilter::onPing() {
278
0
  config_->stats_.ping_rq_.inc();
279
0
  setDynamicMetadata("opname", "ping");
280
0
}
281
282
0
void ZooKeeperFilter::onAuthRequest(const std::string& scheme) {
283
0
  Stats::Counter& counter = Stats::Utility::counterFromStatNames(
284
0
      config_->scope_, {config_->stat_prefix_, config_->auth_,
285
0
                        config_->stat_name_set_->getBuiltin(absl::StrCat(scheme, "_rq"),
286
0
                                                            config_->unknown_scheme_rq_)});
287
0
  counter.inc();
288
0
  setDynamicMetadata("opname", "auth");
289
0
}
290
291
0
void ZooKeeperFilter::onGetDataRequest(const std::string& path, const bool watch) {
292
0
  config_->stats_.getdata_rq_.inc();
293
0
  setDynamicMetadata({{"opname", "getdata"}, {"path", path}, {"watch", watch ? "true" : "false"}});
294
0
}
295
296
void ZooKeeperFilter::onCreateRequest(const std::string& path, const CreateFlags flags,
297
0
                                      const OpCodes opcode) {
298
0
  std::string opname;
299
300
0
  switch (opcode) {
301
0
  case OpCodes::Create:
302
0
    opname = "create";
303
0
    config_->stats_.create_rq_.inc();
304
0
    break;
305
0
  case OpCodes::Create2:
306
0
    opname = "create2";
307
0
    config_->stats_.create2_rq_.inc();
308
0
    break;
309
0
  case OpCodes::CreateContainer:
310
0
    opname = "createcontainer";
311
0
    config_->stats_.createcontainer_rq_.inc();
312
0
    break;
313
0
  case OpCodes::CreateTtl:
314
0
    opname = "createttl";
315
0
    config_->stats_.createttl_rq_.inc();
316
0
    break;
317
0
  default:
318
0
    throw EnvoyException(fmt::format("Unknown opcode: {}", enumToSignedInt(opcode)));
319
0
    break;
320
0
  }
321
322
0
  setDynamicMetadata(
323
0
      {{"opname", opname}, {"path", path}, {"create_type", createFlagsToString(flags)}});
324
0
}
325
326
0
void ZooKeeperFilter::onSetRequest(const std::string& path) {
327
0
  config_->stats_.setdata_rq_.inc();
328
0
  setDynamicMetadata({{"opname", "setdata"}, {"path", path}});
329
0
}
330
331
void ZooKeeperFilter::onGetChildrenRequest(const std::string& path, const bool watch,
332
0
                                           const bool v2) {
333
0
  std::string opname = "getchildren";
334
335
0
  if (v2) {
336
0
    config_->stats_.getchildren2_rq_.inc();
337
0
    opname = "getchildren2";
338
0
  } else {
339
0
    config_->stats_.getchildren_rq_.inc();
340
0
  }
341
342
0
  setDynamicMetadata({{"opname", opname}, {"path", path}, {"watch", watch ? "true" : "false"}});
343
0
}
344
345
0
void ZooKeeperFilter::onDeleteRequest(const std::string& path, const int32_t version) {
346
0
  config_->stats_.delete_rq_.inc();
347
0
  setDynamicMetadata({{"opname", "delete"}, {"path", path}, {"version", std::to_string(version)}});
348
0
}
349
350
0
void ZooKeeperFilter::onExistsRequest(const std::string& path, const bool watch) {
351
0
  config_->stats_.exists_rq_.inc();
352
0
  setDynamicMetadata({{"opname", "exists"}, {"path", path}, {"watch", watch ? "true" : "false"}});
353
0
}
354
355
0
void ZooKeeperFilter::onGetAclRequest(const std::string& path) {
356
0
  config_->stats_.getacl_rq_.inc();
357
0
  setDynamicMetadata({{"opname", "getacl"}, {"path", path}});
358
0
}
359
360
0
void ZooKeeperFilter::onSetAclRequest(const std::string& path, const int32_t version) {
361
0
  config_->stats_.setacl_rq_.inc();
362
0
  setDynamicMetadata({{"opname", "setacl"}, {"path", path}, {"version", std::to_string(version)}});
363
0
}
364
365
0
void ZooKeeperFilter::onSyncRequest(const std::string& path) {
366
0
  config_->stats_.sync_rq_.inc();
367
0
  setDynamicMetadata({{"opname", "sync"}, {"path", path}});
368
0
}
369
370
0
void ZooKeeperFilter::onCheckRequest(const std::string&, const int32_t) {
371
0
  config_->stats_.check_rq_.inc();
372
0
}
373
374
0
void ZooKeeperFilter::onCheckWatchesRequest(const std::string& path, const int32_t) {
375
0
  config_->stats_.checkwatches_rq_.inc();
376
0
  setDynamicMetadata({{"opname", "checkwatches"}, {"path", path}});
377
0
}
378
379
0
void ZooKeeperFilter::onRemoveWatchesRequest(const std::string& path, const int32_t) {
380
0
  config_->stats_.removewatches_rq_.inc();
381
0
  setDynamicMetadata({{"opname", "removewatches"}, {"path", path}});
382
0
}
383
384
0
void ZooKeeperFilter::onMultiRequest() {
385
0
  config_->stats_.multi_rq_.inc();
386
0
  setDynamicMetadata("opname", "multi");
387
0
}
388
389
0
void ZooKeeperFilter::onReconfigRequest() {
390
0
  config_->stats_.reconfig_rq_.inc();
391
0
  setDynamicMetadata("opname", "reconfig");
392
0
}
393
394
0
void ZooKeeperFilter::onSetWatchesRequest() {
395
0
  config_->stats_.setwatches_rq_.inc();
396
0
  setDynamicMetadata("opname", "setwatches");
397
0
}
398
399
0
void ZooKeeperFilter::onSetWatches2Request() {
400
0
  config_->stats_.setwatches2_rq_.inc();
401
0
  setDynamicMetadata("opname", "setwatches2");
402
0
}
403
404
0
void ZooKeeperFilter::onAddWatchRequest(const std::string& path, const int32_t mode) {
405
0
  config_->stats_.addwatch_rq_.inc();
406
0
  setDynamicMetadata({{"opname", "addwatch"}, {"path", path}, {"mode", std::to_string(mode)}});
407
0
}
408
409
0
void ZooKeeperFilter::onGetEphemeralsRequest(const std::string& path) {
410
0
  config_->stats_.getephemerals_rq_.inc();
411
0
  setDynamicMetadata({{"opname", "getephemerals"}, {"path", path}});
412
0
}
413
414
0
void ZooKeeperFilter::onGetAllChildrenNumberRequest(const std::string& path) {
415
0
  config_->stats_.getallchildrennumber_rq_.inc();
416
0
  setDynamicMetadata({{"opname", "getallchildrennumber"}, {"path", path}});
417
0
}
418
419
0
void ZooKeeperFilter::onCloseRequest() {
420
0
  config_->stats_.close_rq_.inc();
421
0
  setDynamicMetadata("opname", "close");
422
0
}
423
424
void ZooKeeperFilter::onConnectResponse(const int32_t proto_version, const int32_t timeout,
425
                                        const bool readonly,
426
0
                                        const std::chrono::milliseconds latency) {
427
0
  config_->stats_.connect_resp_.inc();
428
429
0
  switch (config_->errorBudgetDecision(OpCodes::Connect, latency)) {
430
0
  case ErrorBudgetResponseType::Fast:
431
0
    config_->stats_.connect_resp_fast_.inc();
432
0
    break;
433
0
  case ErrorBudgetResponseType::Slow:
434
0
    config_->stats_.connect_resp_slow_.inc();
435
0
    break;
436
0
  case ErrorBudgetResponseType::None:
437
0
    break;
438
0
  }
439
440
0
  Stats::Histogram& histogram = Stats::Utility::histogramFromElements(
441
0
      config_->scope_, {config_->stat_prefix_, config_->connect_latency_},
442
0
      Stats::Histogram::Unit::Milliseconds);
443
0
  histogram.recordValue(latency.count());
444
445
0
  setDynamicMetadata({{"opname", "connect_response"},
446
0
                      {"protocol_version", std::to_string(proto_version)},
447
0
                      {"timeout", std::to_string(timeout)},
448
0
                      {"readonly", std::to_string(readonly)}});
449
0
}
450
451
void ZooKeeperFilter::onResponse(const OpCodes opcode, const int32_t xid, const int64_t zxid,
452
0
                                 const int32_t error, const std::chrono::milliseconds latency) {
453
0
  Stats::StatName opcode_latency = config_->unknown_opcode_latency_;
454
0
  std::string opname = "";
455
0
  auto iter = config_->op_code_map_.find(opcode);
456
0
  if (iter != config_->op_code_map_.end()) {
457
0
    const ZooKeeperFilterConfig::OpCodeInfo& opcode_info = iter->second;
458
0
    opcode_info.resp_counter_->inc();
459
0
    opcode_latency = opcode_info.latency_name_;
460
0
    opname = opcode_info.opname_;
461
462
0
    switch (config_->errorBudgetDecision(opcode, latency)) {
463
0
    case ErrorBudgetResponseType::Fast:
464
0
      opcode_info.resp_fast_counter_->inc();
465
0
      break;
466
0
    case ErrorBudgetResponseType::Slow:
467
0
      opcode_info.resp_slow_counter_->inc();
468
0
      break;
469
0
    case ErrorBudgetResponseType::None:
470
0
      break;
471
0
    }
472
0
  }
473
474
0
  Stats::Histogram& histogram = Stats::Utility::histogramFromStatNames(
475
0
      config_->scope_, {config_->stat_prefix_, opcode_latency},
476
0
      Stats::Histogram::Unit::Milliseconds);
477
0
  histogram.recordValue(latency.count());
478
479
0
  setDynamicMetadata({{"opname", opname},
480
0
                      {"xid", std::to_string(xid)},
481
0
                      {"zxid", std::to_string(zxid)},
482
0
                      {"error", std::to_string(error)}});
483
0
}
484
485
void ZooKeeperFilter::onWatchEvent(const int32_t event_type, const int32_t client_state,
486
                                   const std::string& path, const int64_t zxid,
487
2.98k
                                   const int32_t error) {
488
2.98k
  config_->stats_.watch_event_.inc();
489
2.98k
  setDynamicMetadata({{"opname", "watch_event"},
490
2.98k
                      {"event_type", std::to_string(event_type)},
491
2.98k
                      {"client_state", std::to_string(client_state)},
492
2.98k
                      {"path", path},
493
2.98k
                      {"zxid", std::to_string(zxid)},
494
2.98k
                      {"error", std::to_string(error)}});
495
2.98k
}
496
497
} // namespace ZooKeeperProxy
498
} // namespace NetworkFilters
499
} // namespace Extensions
500
} // namespace Envoy