1
#include "source/extensions/access_loggers/stats/stats.h"
2

            
3
#include "envoy/data/accesslog/v3/accesslog.pb.h"
4
#include "envoy/registry/registry.h"
5
#include "envoy/stats/scope.h"
6
#include "envoy/stream_info/filter_state.h"
7

            
8
#include "source/common/formatter/substitution_formatter.h"
9
#include "source/common/stats/symbol_table.h"
10

            
11
namespace Envoy {
12
namespace Extensions {
13
namespace AccessLoggers {
14
namespace StatsAccessLog {
15

            
16
namespace {
17

            
18
using Extensions::Matching::Actions::TransformStat::ActionContext;
19

            
20
class AccessLogState : public StreamInfo::FilterState::Object {
21
public:
22
6
  AccessLogState(Stats::ScopeSharedPtr scope) : scope_(std::move(scope)) {}
23

            
24
  // When the request is destroyed, we need to subtract the value from the gauge.
25
  // We need to look up the gauge again in the scope because it might have been evicted.
26
  // The gauge object itself is kept alive by the shared_ptr in the state, so we can access its
27
  // name and tags to re-lookup/re-create it in the scope.
28
6
  ~AccessLogState() override {
29
6
    for (const auto& [gauge_ptr, state] : inflight_gauges_) {
30
      // TODO(taoxuy):  make this as an accessor of the
31
      // Stat class.
32
2
      Stats::StatNameTagVector tag_names;
33
2
      state.gauge_->iterateTagStatNames(
34
2
          [&tag_names](Stats::StatName name, Stats::StatName value) -> bool {
35
2
            tag_names.emplace_back(name, value);
36
2
            return true;
37
2
          });
38

            
39
      // Using state.gauge_->statName() directly would be incorrect because it returns the fully
40
      // qualified name (including tags). Passing this full name to scope_->gaugeFromStatName(...)
41
      // would cause the scope to attempt tag extraction on the full name. Since the tags in
42
      // AccessLogState are often dynamic and not configured in the global tag extractors, this
43
      // extraction would likely fail to identify the tags correctly, resulting in a gauge with a
44
      // different identity (the full name as the stat name and no tags).
45
2
      auto& gauge = scope_->gaugeFromStatNameWithTags(
46
2
          state.gauge_->tagExtractedStatName(), tag_names, Stats::Gauge::ImportMode::Accumulate);
47
2
      gauge.sub(state.value_);
48
2
    }
49
6
  }
50

            
51
5
  void addInflightGauge(Stats::Gauge* gauge, uint64_t value) {
52
5
    inflight_gauges_.try_emplace(gauge, Stats::GaugeSharedPtr(gauge), value);
53
5
  }
54

            
55
6
  absl::optional<uint64_t> removeInflightGauge(Stats::Gauge* gauge) {
56
6
    auto it = inflight_gauges_.find(gauge);
57
6
    if (it == inflight_gauges_.end()) {
58
3
      return absl::nullopt;
59
3
    }
60
3
    uint64_t value = it->second.value_;
61
3
    inflight_gauges_.erase(it);
62
3
    return value;
63
6
  }
64

            
65
28
  static constexpr absl::string_view key() { return "envoy.access_loggers.stats.access_log_state"; }
66

            
67
private:
68
  struct State {
69
5
    State(Stats::GaugeSharedPtr gauge, uint64_t value) : gauge_(std::move(gauge)), value_(value) {}
70

            
71
    Stats::GaugeSharedPtr gauge_;
72
    uint64_t value_;
73
  };
74

            
75
  Stats::ScopeSharedPtr scope_;
76

            
77
  // The map key holds a raw pointer to the gauge. The value holds a ref-counted pointer to ensure
78
  // the gauge is not destroyed if it is evicted from the stats scope.
79
  absl::flat_hash_map<Stats::Gauge*, State> inflight_gauges_;
80
};
81

            
82
Formatter::FormatterProviderPtr
83
parseValueFormat(absl::string_view format,
84
20
                 const std::vector<Formatter::CommandParserPtr>& commands) {
85
20
  auto formatters =
86
20
      THROW_OR_RETURN_VALUE(Formatter::SubstitutionFormatParser::parse(format, commands),
87
20
                            std::vector<Formatter::FormatterProviderPtr>);
88
20
  if (formatters.size() != 1) {
89
1
    throw EnvoyException(
90
1
        "Stats logger `value_format` string must contain exactly one substitution");
91
1
  }
92
19
  return std::move(formatters[0]);
93
20
}
94

            
95
Stats::Histogram::Unit
96
14
convertUnitEnum(envoy::extensions::access_loggers::stats::v3::Config::Histogram::Unit unit) {
97
14
  switch (unit) {
98
4
  case envoy::extensions::access_loggers::stats::v3::Config::Histogram::Unspecified:
99
4
    return Stats::Histogram::Unit::Unspecified;
100
4
  case envoy::extensions::access_loggers::stats::v3::Config::Histogram::Bytes:
101
4
    return Stats::Histogram::Unit::Bytes;
102
1
  case envoy::extensions::access_loggers::stats::v3::Config::Histogram::Microseconds:
103
1
    return Stats::Histogram::Unit::Microseconds;
104
1
  case envoy::extensions::access_loggers::stats::v3::Config::Histogram::Milliseconds:
105
1
    return Stats::Histogram::Unit::Milliseconds;
106
3
  case envoy::extensions::access_loggers::stats::v3::Config::Histogram::Percent:
107
3
    return Stats::Histogram::Unit::Percent;
108
1
  default:
109
1
    throw EnvoyException(fmt::format("Unknown histogram unit value in stats logger: {}",
110
1
                                     static_cast<int64_t>(unit)));
111
14
  }
112
14
}
113

            
114
struct StatTagMetric : public Stats::StatTagMatchingData {
115
10
  StatTagMetric(absl::string_view value) : value_(value) {}
116
13
  absl::string_view value() const override { return value_; }
117
  absl::string_view value_;
118
};
119

            
120
class ActionValidationVisitor
121
    : public Matcher::MatchTreeValidationVisitor<Stats::StatMatchingData> {
122
public:
123
  absl::Status performDataInputValidation(const Matcher::DataInputFactory<Stats::StatMatchingData>&,
124
                                          absl::string_view) override {
125
    return absl::OkStatus();
126
  }
127
};
128

            
129
class TagActionValidationVisitor
130
    : public Matcher::MatchTreeValidationVisitor<Stats::StatTagMatchingData> {
131
public:
132
  absl::Status
133
  performDataInputValidation(const Matcher::DataInputFactory<Stats::StatTagMatchingData>&,
134
10
                             absl::string_view) override {
135
10
    return absl::OkStatus();
136
10
  }
137
};
138

            
139
} // namespace
140

            
141
StatsAccessLog::StatsAccessLog(const envoy::extensions::access_loggers::stats::v3::Config& config,
142
                               Server::Configuration::GenericFactoryContext& context,
143
                               AccessLog::FilterPtr&& filter,
144
                               const std::vector<Formatter::CommandParserPtr>& commands)
145
40
    : Common::ImplBase(std::move(filter)),
146
40
      scope_(context.statsScope().createScope(config.stat_prefix(), true /* evictable */)),
147
40
      stat_name_pool_(scope_->symbolTable()), histograms_([&]() {
148
40
        std::vector<Histogram> histograms;
149
40
        for (const auto& hist_cfg : config.histograms()) {
150
14
          histograms.emplace_back(NameAndTags(hist_cfg.stat(), stat_name_pool_, commands, context),
151
14
                                  convertUnitEnum(hist_cfg.unit()),
152
14
                                  parseValueFormat(hist_cfg.value_format(), commands));
153
14
        }
154
40
        return histograms;
155
40
      }()),
156
40
      counters_([&]() {
157
39
        std::vector<Counter> counters;
158
39
        for (const auto& counter_cfg : config.counters()) {
159
14
          Counter& inserted = counters.emplace_back(
160
14
              NameAndTags(counter_cfg.stat(), stat_name_pool_, commands, context), nullptr, 0);
161
14
          if (!counter_cfg.value_format().empty() && counter_cfg.has_value_fixed()) {
162
1
            throw EnvoyException(
163
1
                "Stats logger cannot have both `value_format` and `value_fixed` configured.");
164
1
          }
165

            
166
13
          if (!counter_cfg.value_format().empty()) {
167
5
            inserted.value_formatter_ = parseValueFormat(counter_cfg.value_format(), commands);
168
9
          } else if (counter_cfg.has_value_fixed()) {
169
7
            inserted.value_fixed_ = counter_cfg.value_fixed().value();
170
7
          } else {
171
1
            throw EnvoyException(
172
1
                "Stats logger counter must have either `value_format` or `value_fixed`.");
173
1
          }
174
13
        }
175
37
        return counters;
176
39
      }()),
177
40
      gauges_([&]() {
178
36
        std::vector<Gauge> gauges;
179
36
        for (const auto& gauge_cfg : config.gauges()) {
180
19
          if (gauge_cfg.has_add_subtract() && gauge_cfg.has_set()) {
181
1
            throw EnvoyException(
182
1
                "Stats logger gauge cannot have both SET and PAIRED_ADD/PAIRED_SUBTRACT "
183
1
                "operations.");
184
1
          }
185

            
186
18
          if (!gauge_cfg.has_add_subtract() && !gauge_cfg.has_set()) {
187
1
            throw EnvoyException("Stats logger gauge must have at least one operation configured.");
188
1
          }
189

            
190
17
          absl::InlinedVector<
191
17
              std::pair<envoy::data::accesslog::v3::AccessLogType, Gauge::OperationType>, 2>
192
17
              operations;
193

            
194
17
          if (gauge_cfg.has_add_subtract()) {
195
7
            if (gauge_cfg.add_subtract().add_log_type() ==
196
7
                    envoy::data::accesslog::v3::AccessLogType::NotSet ||
197
7
                gauge_cfg.add_subtract().sub_log_type() ==
198
7
                    envoy::data::accesslog::v3::AccessLogType::NotSet) {
199
              throw EnvoyException(
200
                  "Stats logger gauge add/subtract operation must have a valid log type.");
201
            }
202
7
            if (gauge_cfg.add_subtract().add_log_type() ==
203
7
                gauge_cfg.add_subtract().sub_log_type()) {
204
1
              throw EnvoyException(
205
1
                  fmt::format("Duplicate access log type '{}' in gauge operations.",
206
1
                              static_cast<int>(gauge_cfg.add_subtract().add_log_type())));
207
1
            }
208
6
            operations.emplace_back(gauge_cfg.add_subtract().add_log_type(),
209
6
                                    Gauge::OperationType::PAIRED_ADD);
210
6
            operations.emplace_back(gauge_cfg.add_subtract().sub_log_type(),
211
6
                                    Gauge::OperationType::PAIRED_SUBTRACT);
212
12
          } else {
213
10
            if (gauge_cfg.set().log_type() == envoy::data::accesslog::v3::AccessLogType::NotSet) {
214
1
              throw EnvoyException("Stats logger gauge set operation must have a valid log type.");
215
1
            }
216
9
            operations.emplace_back(gauge_cfg.set().log_type(), Gauge::OperationType::SET);
217
9
          }
218

            
219
15
          Gauge& inserted =
220
15
              gauges.emplace_back(NameAndTags(gauge_cfg.stat(), stat_name_pool_, commands, context),
221
15
                                  nullptr, 0, std::move(operations));
222

            
223
15
          if (!gauge_cfg.value_format().empty() && gauge_cfg.has_value_fixed()) {
224
1
            throw EnvoyException(
225
1
                "Stats logger cannot have both `value_format` and `value_fixed` configured.");
226
1
          }
227
14
          if (!gauge_cfg.value_format().empty()) {
228
2
            inserted.value_formatter_ = parseValueFormat(gauge_cfg.value_format(), commands);
229
12
          } else if (gauge_cfg.has_value_fixed()) {
230
11
            inserted.value_fixed_ = gauge_cfg.value_fixed().value();
231
11
          } else {
232
1
            throw EnvoyException(
233
1
                "Stats logger gauge must have either `value_format` or `value_fixed`.");
234
1
          }
235
14
        }
236
30
        return gauges;
237
40
      }()) {}
238

            
239
StatsAccessLog::NameAndTags::NameAndTags(
240
    const envoy::extensions::access_loggers::stats::v3::Config::Stat& cfg,
241
    Stats::StatNamePool& pool, const std::vector<Formatter::CommandParserPtr>& commands,
242
43
    Server::Configuration::GenericFactoryContext& context) {
243
43
  name_ = pool.add(cfg.name());
244
43
  for (const auto& tag_cfg : cfg.tags()) {
245
21
    dynamic_tags_.emplace_back(tag_cfg, pool, commands, context);
246
21
  }
247
43
}
248

            
249
StatsAccessLog::DynamicTag::DynamicTag(
250
    const envoy::extensions::access_loggers::stats::v3::Config::Tag& tag_cfg,
251
    Stats::StatNamePool& pool, const std::vector<Formatter::CommandParserPtr>& commands,
252
    Server::Configuration::GenericFactoryContext& context)
253
21
    : name_(pool.add(tag_cfg.name())),
254
21
      value_formatter_(THROW_OR_RETURN_VALUE(
255
          Formatter::FormatterImpl::create(tag_cfg.value_format(), true, commands),
256
21
          Formatter::FormatterPtr)) {
257
21
  if (tag_cfg.has_rules()) {
258
10
    TagActionValidationVisitor validation_visitor;
259
10
    ActionContext action_context(pool);
260
10
    Matcher::MatchTreeFactory<Stats::StatTagMatchingData, ActionContext> factory(
261
10
        action_context, context.serverFactoryContext(), validation_visitor);
262
10
    rules_ = factory.create(tag_cfg.rules())();
263
10
  }
264
21
}
265

            
266
StatsAccessLog::NameAndTags::TagsResult
267
StatsAccessLog::NameAndTags::tags(const Formatter::Context& context,
268
                                  const StreamInfo::StreamInfo& stream_info,
269
240
                                  Stats::Scope& scope) const {
270
240
  Stats::StatNameTagVector tags;
271
240
  tags.reserve(dynamic_tags_.size());
272
240
  std::vector<Stats::StatNameDynamicStorage> dynamic_storage;
273
240
  dynamic_storage.reserve(dynamic_tags_.size());
274

            
275
240
  for (const auto& dynamic_tag : dynamic_tags_) {
276
121
    std::string tag_value = dynamic_tag.value_formatter_->format(context, stream_info);
277
121
    if (dynamic_tag.rules_) {
278
10
      StatTagMetric data(tag_value);
279
10
      const auto result = dynamic_tag.rules_->match(data);
280
10
      if (result.isMatch()) {
281
7
        if (const auto* action = dynamic_cast<
282
7
                const Extensions::Matching::Actions::TransformStat::TransformStatAction*>(
283
7
                result.action().get())) {
284
7
          switch (action->apply(tag_value)) {
285
3
          case Extensions::Matching::Actions::TransformStat::TransformStatAction::Result::Keep:
286
3
            break;
287
3
          case Extensions::Matching::Actions::TransformStat::TransformStatAction::Result::DropStat:
288
3
            return {{}, {}, true};
289
1
          case Extensions::Matching::Actions::TransformStat::TransformStatAction::Result::DropTag:
290
1
            continue;
291
7
          }
292
7
        }
293
7
      }
294
10
    }
295

            
296
117
    auto& storage_value = dynamic_storage.emplace_back(tag_value, scope.symbolTable());
297
117
    tags.emplace_back(dynamic_tag.name_, storage_value.statName());
298
117
  }
299

            
300
237
  return {std::move(tags), std::move(dynamic_storage), false};
301
240
}
302

            
303
namespace {
304
absl::optional<uint64_t> getFormatValue(const Formatter::FormatterProvider& formatter,
305
                                        const Formatter::Context& context,
306
                                        const StreamInfo::StreamInfo& stream_info,
307
216
                                        bool is_percent) {
308
216
  Protobuf::Value computed_value = formatter.formatValue(context, stream_info);
309
216
  double value;
310
216
  if (computed_value.has_number_value()) {
311
109
    value = computed_value.number_value();
312
110
  } else if (computed_value.has_string_value()) {
313
6
    if (!absl::SimpleAtod(computed_value.string_value(), &value)) {
314
2
      ENVOY_LOG_PERIODIC_MISC(error, std::chrono::seconds(10),
315
2
                              "Stats access logger formatted a string that isn't a number: {}",
316
2
                              computed_value.string_value());
317
2
      return absl::nullopt;
318
2
    }
319
104
  } else {
320
101
    ENVOY_LOG_PERIODIC_MISC(error, std::chrono::seconds(10),
321
101
                            "Stats access logger computed non-number value: {}",
322
101
                            computed_value.DebugString());
323
101
    return absl::nullopt;
324
101
  }
325

            
326
113
  if (is_percent) {
327
3
    value *= Stats::Histogram::PercentScale;
328
3
  }
329
113
  return value;
330
216
}
331
} // namespace
332

            
333
void StatsAccessLog::emitLog(const Formatter::Context& context,
334
135
                             const StreamInfo::StreamInfo& stream_info) {
335
135
  emitLogConst(context, stream_info);
336
135
}
337

            
338
void StatsAccessLog::emitLogConst(const Formatter::Context& context,
339
135
                                  const StreamInfo::StreamInfo& stream_info) const {
340
135
  for (const auto& histogram : histograms_) {
341
112
    auto [tags, storage, dropped] = histogram.stat_.tags(context, stream_info, *scope_);
342

            
343
112
    if (dropped) {
344
1
      continue;
345
1
    }
346

            
347
111
    absl::optional<uint64_t> computed_value_opt =
348
111
        getFormatValue(*histogram.value_formatter_, context, stream_info,
349
111
                       histogram.unit_ == Stats::Histogram::Unit::Percent);
350
111
    if (!computed_value_opt.has_value()) {
351
101
      continue;
352
101
    }
353

            
354
10
    uint64_t value = *computed_value_opt;
355

            
356
10
    auto& histogram_stat =
357
10
        scope_->histogramFromStatNameWithTags(histogram.stat_.name_, tags, histogram.unit_);
358
10
    histogram_stat.recordValue(value);
359
10
  }
360

            
361
135
  for (const auto& counter : counters_) {
362
110
    auto [tags, storage, dropped] = counter.stat_.tags(context, stream_info, *scope_);
363

            
364
110
    if (dropped) {
365
1
      continue;
366
1
    }
367

            
368
109
    uint64_t value;
369
109
    if (counter.value_formatter_ != nullptr) {
370
103
      absl::optional<uint64_t> computed_value_opt =
371
103
          getFormatValue(*counter.value_formatter_, context, stream_info, false);
372
103
      if (!computed_value_opt.has_value()) {
373
1
        continue;
374
1
      }
375

            
376
102
      value = *computed_value_opt;
377
106
    } else {
378
6
      value = counter.value_fixed_;
379
6
    }
380

            
381
108
    auto& counter_stat = scope_->counterFromStatNameWithTags(counter.stat_.name_, tags);
382
108
    counter_stat.add(value);
383
108
  }
384

            
385
135
  for (const auto& gauge : gauges_) {
386
19
    emitLogForGauge(gauge, context, stream_info);
387
19
  }
388
135
}
389

            
390
void StatsAccessLog::emitLogForGauge(const Gauge& gauge, const Formatter::Context& context,
391
19
                                     const StreamInfo::StreamInfo& stream_info) const {
392
19
  auto it = std::find_if(gauge.operations_.begin(), gauge.operations_.end(),
393
26
                         [&](const auto& op) { return op.first == context.accessLogType(); });
394
19
  if (it == gauge.operations_.end()) {
395
1
    return;
396
1
  }
397

            
398
18
  auto [tags, storage, dropped] = gauge.stat_.tags(context, stream_info, *scope_);
399
18
  if (dropped) {
400
1
    return;
401
1
  }
402

            
403
17
  uint64_t value;
404
17
  if (gauge.value_formatter_ != nullptr) {
405
2
    absl::optional<uint64_t> computed_value_opt =
406
2
        getFormatValue(*gauge.value_formatter_, context, stream_info, false);
407
2
    if (!computed_value_opt.has_value()) {
408
1
      return;
409
1
    }
410

            
411
1
    value = *computed_value_opt;
412
15
  } else {
413
15
    value = gauge.value_fixed_;
414
15
  }
415

            
416
16
  Gauge::OperationType op = it->second;
417
16
  Stats::Gauge::ImportMode import_mode = op == Gauge::OperationType::SET
418
16
                                             ? Stats::Gauge::ImportMode::NeverImport
419
16
                                             : Stats::Gauge::ImportMode::Accumulate;
420
16
  auto& gauge_stat = scope_->gaugeFromStatNameWithTags(gauge.stat_.name_, tags, import_mode);
421

            
422
16
  if (op == Gauge::OperationType::PAIRED_ADD || op == Gauge::OperationType::PAIRED_SUBTRACT) {
423
11
    auto& filter_state = const_cast<StreamInfo::FilterState&>(stream_info.filterState());
424
11
    if (!filter_state.hasData<AccessLogState>(AccessLogState::key())) {
425
6
      filter_state.setData(AccessLogState::key(), std::make_shared<AccessLogState>(scope_),
426
6
                           StreamInfo::FilterState::StateType::Mutable,
427
6
                           StreamInfo::FilterState::LifeSpan::Request);
428
6
    }
429
11
    auto* state = filter_state.getDataMutable<AccessLogState>(AccessLogState::key());
430

            
431
11
    if (op == Gauge::OperationType::PAIRED_ADD) {
432
5
      state->addInflightGauge(&gauge_stat, value);
433
5
      gauge_stat.add(value);
434
6
    } else {
435
6
      absl::optional<uint64_t> added_value = state->removeInflightGauge(&gauge_stat);
436
6
      if (added_value.has_value()) {
437
3
        gauge_stat.sub(added_value.value());
438
3
      }
439
6
    }
440
11
    return;
441
11
  }
442

            
443
5
  if (op == Gauge::OperationType::SET) {
444
5
    gauge_stat.set(value);
445
5
  }
446
5
}
447

            
448
} // namespace StatsAccessLog
449
} // namespace AccessLoggers
450
} // namespace Extensions
451
} // namespace Envoy