1
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.h"
2

            
3
namespace Envoy {
4
namespace Extensions {
5
namespace Tracers {
6
namespace OpenTelemetry {
7

            
8
namespace {}
9

            
10
14
void SamplingController::update() {
11
14
  absl::WriterMutexLock lock{&stream_summary_mutex_};
12
14
  const auto top_k = stream_summary_->getTopK();
13
14
  const auto last_period_count = stream_summary_->getN();
14

            
15
  // update sampling exponents
16
14
  update(top_k, last_period_count,
17
14
         sampler_config_provider_->getSamplerConfig().getRootSpansPerMinute());
18
  // Note: getTopK() returns references to values in StreamSummary.
19
  // Do not destroy it while top_k is used!
20
14
  stream_summary_ = std::make_unique<StreamSummaryT>(STREAM_SUMMARY_SIZE);
21
14
}
22

            
23
void SamplingController::update(const TopKListT& top_k, uint64_t last_period_count,
24
14
                                uint32_t total_wanted) {
25

            
26
14
  SamplingExponentsT new_sampling_exponents;
27
  // start with sampling exponent 0, which means multiplicity == 1 (every span is sampled)
28
137
  for (auto const& counter : top_k) {
29
137
    new_sampling_exponents[counter.getItem()] = SamplingState(0);
30
137
  }
31

            
32
  // use the last entry as "rest bucket", which is used for new/unknown requests
33
14
  rest_bucket_key_ = (!top_k.empty()) ? top_k.back().getItem() : "";
34

            
35
14
  calculateSamplingExponents(top_k, total_wanted, new_sampling_exponents);
36
14
  last_effective_count_ = calculateEffectiveCount(top_k, new_sampling_exponents);
37
14
  logSamplingInfo(top_k, new_sampling_exponents, last_period_count, total_wanted);
38

            
39
14
  absl::WriterMutexLock lock{&sampling_exponents_mutex_};
40
14
  sampling_exponents_ = std::move(new_sampling_exponents);
41
14
}
42

            
43
3
uint64_t SamplingController::getEffectiveCount() const {
44
3
  absl::ReaderMutexLock lock{&stream_summary_mutex_};
45
3
  return last_effective_count_;
46
3
}
47

            
48
57544
void SamplingController::offer(const std::string& sampling_key) {
49
57544
  if (!sampling_key.empty()) {
50
57530
    absl::WriterMutexLock lock{&stream_summary_mutex_};
51
57530
    stream_summary_->offer(sampling_key);
52
57530
  }
53
57544
}
54

            
55
1455
SamplingState SamplingController::getSamplingState(const std::string& sampling_key) const {
56
1455
  { // scope for lock
57
1455
    absl::ReaderMutexLock sax_lock{&sampling_exponents_mutex_};
58
1455
    auto iter = sampling_exponents_.find(sampling_key);
59
1455
    if (iter != sampling_exponents_.end()) {
60
46
      return iter->second;
61
46
    }
62

            
63
    // try to use "rest bucket"
64
1409
    auto rest_bucket_iter = sampling_exponents_.find(rest_bucket_key_);
65
1409
    if (rest_bucket_iter != sampling_exponents_.end()) {
66
6
      return rest_bucket_iter->second;
67
6
    }
68
1409
  }
69

            
70
  // If we can't find a sampling exponent, we calculate it based on the total number of requests
71
  // in this period. This should also handle the "warm up phase" where no top_k is available
72
1403
  const auto divisor = sampler_config_provider_->getSamplerConfig().getRootSpansPerMinute() / 2;
73
1403
  if (divisor == 0) {
74
6
    return SamplingState{MAX_SAMPLING_EXPONENT};
75
6
  }
76
1397
  absl::ReaderMutexLock ss_lock{&stream_summary_mutex_};
77
1397
  const uint32_t exp = stream_summary_->getN() / divisor;
78
1397
  return SamplingState{std::min(exp, MAX_SAMPLING_EXPONENT)};
79
1403
}
80

            
81
std::string SamplingController::getSamplingKey(const absl::string_view path_query,
82
1381
                                               const absl::string_view method) {
83
  // remove query part (truncate after first '?')
84
1381
  const size_t query_offset = path_query.find('?');
85
1381
  auto path =
86
1381
      path_query.substr(0, query_offset != path_query.npos ? query_offset : path_query.size());
87
1381
  return absl::StrCat(method, "_", path);
88
1381
}
89

            
90
void SamplingController::logSamplingInfo(const TopKListT& top_k,
91
                                         const SamplingExponentsT& new_sampling_exponents,
92
14
                                         uint64_t last_period_count, uint32_t total_wanted) const {
93
14
  ENVOY_LOG(debug,
94
14
            "Updating sampling info. top_k.size(): {}, last_period_count: {}, total_wanted: {}",
95
14
            top_k.size(), last_period_count, total_wanted);
96
137
  for (auto const& counter : top_k) {
97
137
    auto sampling_state = new_sampling_exponents.find(counter.getItem());
98
137
    ENVOY_LOG(debug, "- {}: value: {}, exponent: {}", counter.getItem(), counter.getValue(),
99
137
              sampling_state->second.getExponent());
100
137
  }
101
14
}
102

            
103
uint64_t SamplingController::calculateEffectiveCount(const TopKListT& top_k,
104
90
                                                     const SamplingExponentsT& sampling_exponents) {
105
90
  uint64_t cnt = 0;
106
1101
  for (auto const& counter : top_k) {
107
1101
    auto sampling_state = sampling_exponents.find(counter.getItem());
108
1101
    if (sampling_state != sampling_exponents.end()) {
109
1101
      auto counterVal = counter.getValue();
110
1101
      auto mul = sampling_state->second.getMultiplicity();
111
1101
      auto res = counterVal / mul;
112
1101
      cnt += res;
113
1101
    }
114
1101
  }
115
90
  return cnt;
116
90
}
117

            
118
void SamplingController::calculateSamplingExponents(
119
    const TopKListT& top_k, uint32_t total_wanted,
120
14
    SamplingExponentsT& new_sampling_exponents) const {
121
14
  const auto top_k_size = top_k.size();
122
14
  if (top_k_size == 0 || total_wanted == 0) {
123
4
    return;
124
4
  }
125

            
126
137
  for (auto& counter : top_k) {
127
    // allowed multiplicity for this entry
128
137
    auto wanted_multiplicity = counter.getValue() * top_k_size / total_wanted;
129
137
    auto sampling_state = new_sampling_exponents.find(counter.getItem());
130
    // sampling exponent has to be a power of 2. Find the exponent to have multiplicity near to
131
    // wanted_multiplicity
132
318
    while (wanted_multiplicity > sampling_state->second.getMultiplicity() &&
133
318
           sampling_state->second.getExponent() < MAX_SAMPLING_EXPONENT) {
134
181
      sampling_state->second.increaseExponent();
135
181
    }
136
137
    if (wanted_multiplicity < sampling_state->second.getMultiplicity()) {
137
      // we want to have multiplicity <= wanted_multiplicity, therefore exponent is decreased once.
138
35
      sampling_state->second.decreaseExponent();
139
35
    }
140
137
  }
141

            
142
10
  auto effective_count = calculateEffectiveCount(top_k, new_sampling_exponents);
143
  // There might be entries where allowed_per_entry is greater than their count.
144
  // Therefore, we would sample less than total_wanted.
145
  // To avoid this, we decrease the exponent of other entries if possible
146
10
  if (effective_count < total_wanted) {
147
18
    for (int i = 0; i < 5; i++) { // max tries
148
78
      for (auto reverse_it = top_k.rbegin(); reverse_it != top_k.rend();
149
66
           ++reverse_it) { // start with lowest frequency
150
66
        auto rev_sampling_state = new_sampling_exponents.find(reverse_it->getItem());
151
66
        rev_sampling_state->second.decreaseExponent();
152
66
        effective_count = calculateEffectiveCount(top_k, new_sampling_exponents);
153
66
        if (effective_count >= total_wanted) { // we are done
154
4
          return;
155
4
        }
156
66
      }
157
16
    }
158
6
  }
159
10
}
160
} // namespace OpenTelemetry
161
} // namespace Tracers
162
} // namespace Extensions
163
} // namespace Envoy