1
#pragma once
2

            
3
#include <algorithm>
4
#include <cstdint>
5
#include <list>
6

            
7
#include "source/common/common/assert.h"
8

            
9
#include "absl/container/flat_hash_map.h"
10
#include "absl/status/status.h"
11
#include "absl/types/optional.h"
12

            
13
namespace Envoy {
14
namespace Extensions {
15
namespace Tracers {
16
namespace OpenTelemetry {
17

            
18
namespace detail {
19

            
20
template <typename T> struct Bucket;
21

            
22
template <typename T> using BucketIterator = typename std::list<Bucket<T>>::iterator;
23

            
24
template <typename T> struct Counter {
25
  BucketIterator<T> bucket;
26
  absl::optional<T> item{};
27
  uint64_t value{};
28
  uint64_t error{};
29

            
30
4681
  explicit Counter(BucketIterator<T> bucket) : bucket(bucket) {}
31
  Counter(Counter const&) = delete;
32
  Counter& operator=(Counter const&) = delete;
33
};
34

            
35
template <typename T> using CounterIterator = typename std::list<Counter<T>>::iterator;
36

            
37
template <typename T> struct Bucket {
38
  uint64_t value;
39
  std::list<Counter<T>> children{};
40

            
41
55372
  explicit Bucket(uint64_t value) : value(value) {}
42
  Bucket(Bucket const&) = delete;
43
  Bucket& operator=(Bucket const&) = delete;
44
};
45

            
46
} // namespace detail
47

            
48
template <typename T> class Counter {
49
private:
50
  T const& item_;
51
  uint64_t value_;
52
  uint64_t error_;
53

            
54
public:
55
57885
  Counter(detail::Counter<T> const& c) : item_(*c.item), value_(c.value), error_(c.error) {}
56

            
57
1628
  T const& getItem() const { return item_; }
58
1278
  uint64_t getValue() const { return value_; }
59
40
  uint64_t getError() const { return error_; }
60
};
61

            
62
/**
63
 * @brief Space Saving algorithm implementation also know as "HeavyHitter".
64
 * based on the "Space Saving algorithm", AKA "HeavyHitter"
65
 * See:
66
 * https://cse.hkust.edu.hk/~raywong/comp5331/References/EfficientComputationOfFrequentAndTop-kElementsInDataStreams.pdf
67
 * https://github.com/fzakaria/space-saving/tree/master
68
 *
69
 */
70
template <typename T> class StreamSummary {
71
private:
72
  const size_t capacity_;
73
  uint64_t n_{};
74
  absl::flat_hash_map<T, detail::CounterIterator<T>> cache_{};
75
  std::list<detail::Bucket<T>> buckets_{};
76

            
77
  typename detail::CounterIterator<T> incrementCounter(detail::CounterIterator<T> counter_iter,
78
57675
                                                       uint64_t increment) {
79
57675
    auto const bucket = counter_iter->bucket;
80
57675
    auto bucket_next = std::prev(bucket);
81
57675
    counter_iter->value += increment;
82

            
83
57675
    detail::CounterIterator<T> elem;
84
57675
    if (bucket_next != buckets_.end() && counter_iter->value == bucket_next->value) {
85
2358
      counter_iter->bucket = bucket_next;
86
2358
      bucket_next->children.splice(bucket_next->children.end(), bucket->children, counter_iter);
87
2358
      elem = std::prev(bucket_next->children.end());
88
55317
    } else {
89
55317
      auto bucket_new = buckets_.emplace(bucket, counter_iter->value);
90
55317
      counter_iter->bucket = bucket_new;
91
55317
      bucket_new->children.splice(bucket_new->children.end(), bucket->children, counter_iter);
92
55317
      elem = std::prev(bucket_new->children.end());
93
55317
    }
94
57675
    if (bucket->children.empty()) {
95
55238
      buckets_.erase(bucket);
96
55238
    }
97
57675
    return elem;
98
57675
  }
99

            
100
4
  absl::Status validateInternal() const {
101
4
    auto cache_copy = cache_;
102
4
    auto current_bucket = buckets_.begin();
103
4
    uint64_t value_sum = 0;
104
11
    while (current_bucket != buckets_.end()) {
105
7
      auto prev = std::prev(current_bucket);
106
7
      if (prev != buckets_.end() && prev->value <= current_bucket->value) {
107
        return absl::InternalError("buckets should be in descending order.");
108
      }
109
7
      auto current_child = current_bucket->children.begin();
110
21
      while (current_child != current_bucket->children.end()) {
111
14
        if (current_child->bucket != current_bucket ||
112
14
            current_child->value != current_bucket->value) {
113
          return absl::InternalError("entry does not point to a bucket with the same value.");
114
        }
115
14
        if (current_child->item) {
116
7
          auto old_iter = cache_copy.find(*current_child->item);
117
7
          if (old_iter != cache_copy.end()) {
118
7
            cache_copy.erase(old_iter);
119
7
          }
120
7
        }
121
14
        value_sum += current_child->value;
122
14
        current_child++;
123
14
      }
124
7
      current_bucket++;
125
7
    }
126
4
    if (!cache_copy.empty() || cache_.size() > capacity_ || value_sum != n_) {
127
      return absl::InternalError("unexpected size.");
128
    }
129
4
    return absl::OkStatus();
130
4
  }
131

            
132
57730
  inline void validateDbg() {
133
#if !defined(NDEBUG)
134
    ASSERT(validate().ok());
135
#endif
136
57730
  }
137

            
138
public:
139
55
  explicit StreamSummary(size_t capacity) : capacity_(capacity) {
140
55
    auto& new_bucket = buckets_.emplace_back(0);
141
4736
    for (size_t i = 0; i < capacity; ++i) {
142
      // initialize with empty counters, optional item will not be set
143
4681
      new_bucket.children.emplace_back(buckets_.begin());
144
4681
    }
145
55
    validateDbg();
146
55
  }
147

            
148
  size_t getCapacity() const { return capacity_; }
149

            
150
4
  absl::Status validate() const { return validateInternal(); }
151

            
152
57675
  Counter<T> offer(T const& item, uint64_t increment = 1) {
153
57675
    ++n_;
154
57675
    auto iter = cache_.find(item);
155
57675
    if (iter != cache_.end()) {
156
55474
      iter->second = incrementCounter(iter->second, increment);
157
55474
      validateDbg();
158
55474
      return *iter->second;
159
55478
    } else {
160
2201
      auto min_element = std::prev(buckets_.back().children.end());
161
2201
      auto original_min_value = min_element->value;
162
2201
      if (min_element
163
2201
              ->item) { // element was already used (otherwise optional item would be not set)
164
        // remove old from cache
165
2007
        auto old_iter = cache_.find(*min_element->item);
166
2007
        if (old_iter != cache_.end()) {
167
2007
          cache_.erase(old_iter);
168
2007
        }
169
2007
      }
170
2201
      min_element->item = item;
171
2201
      min_element = incrementCounter(min_element, increment);
172
2201
      cache_[item] = min_element;
173
2201
      if (cache_.size() <= capacity_) {
174
        // should always be true, but keep it to be aligned to reference implementation
175
        // originalMinValue will be 0 if element wasn't already used
176
2201
        min_element->error = original_min_value;
177
2201
      }
178
2201
      validateDbg();
179
2201
      return *min_element;
180
2201
    }
181
57675
  }
182

            
183
1413
  uint64_t getN() const { return n_; }
184

            
185
31
  typename std::list<Counter<T>> getTopK(size_t k = SIZE_MAX) const {
186
31
    std::list<Counter<T>> r;
187
118
    for (auto const& bucket : buckets_) {
188
1525
      for (auto const& child : bucket.children) {
189
1525
        if (child.item) {
190
210
          r.emplace_back(child);
191
210
          if (r.size() == k) {
192
6
            return r;
193
6
          }
194
210
        }
195
1525
      }
196
118
    }
197
25
    return r;
198
31
  }
199
};
200

            
201
} // namespace OpenTelemetry
202
} // namespace Tracers
203
} // namespace Extensions
204
} // namespace Envoy