/src/duckdb/third_party/tdigest/t_digest.hpp
Line | Count | Source |
1 | | /* |
2 | | * Licensed to Derrick R. Burns under one or more |
3 | | * contributor license agreements. See the NOTICES file distributed with |
4 | | * this work for additional information regarding copyright ownership. |
5 | | * The ASF licenses this file to You under the Apache License, Version 2.0 |
6 | | * (the "License"); you may not use this file except in compliance with |
7 | | * the License. You may obtain a copy of the License at |
8 | | * |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | * |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <algorithm> |
21 | | #include <cfloat> |
22 | | #include <cmath> |
23 | | #include <queue> |
24 | | #include <utility> |
25 | | #include <vector> |
26 | | |
27 | | #ifdef min |
28 | | #undef min |
29 | | #endif |
30 | | |
31 | | #ifdef max |
32 | | #undef max |
33 | | #endif |
34 | | |
35 | | |
36 | | namespace duckdb_tdigest { |
37 | | |
38 | | using Value = double; |
39 | | using Weight = double; |
40 | | using Index = size_t; |
41 | | |
42 | | const size_t kHighWater = 40000; |
43 | | const double pi = 3.14159265358979323846; |
44 | | class Centroid { |
45 | | public: |
46 | 0 | Centroid() : Centroid(0.0, 0.0) { |
47 | 0 | } |
48 | | |
49 | 8.77k | Centroid(Value mean, Weight weight) : mean_(mean), weight_(weight) { |
50 | 8.77k | } |
51 | | |
52 | 26.3k | inline Value mean() const noexcept { |
53 | 26.3k | return mean_; |
54 | 26.3k | } |
55 | | |
56 | 17.5k | inline Weight weight() const noexcept { |
57 | 17.5k | return weight_; |
58 | 17.5k | } |
59 | | |
60 | 0 | inline void add(const Centroid &c) { |
61 | | // CHECK_GT(c.weight_, 0); |
62 | 0 | if (weight_ != 0.0) { |
63 | 0 | weight_ += c.weight_; |
64 | 0 | mean_ += c.weight_ * (c.mean_ - mean_) / weight_; |
65 | 0 | } else { |
66 | 0 | weight_ = c.weight_; |
67 | 0 | mean_ = c.mean_; |
68 | 0 | } |
69 | 0 | } |
70 | | |
71 | | private: |
72 | | Value mean_ = 0; |
73 | | Weight weight_ = 0; |
74 | | }; |
75 | | |
76 | | struct CentroidList { |
77 | 0 | explicit CentroidList(const std::vector<Centroid> &s) : iter(s.cbegin()), end(s.cend()) { |
78 | 0 | } |
79 | | std::vector<Centroid>::const_iterator iter; |
80 | | std::vector<Centroid>::const_iterator end; |
81 | | |
82 | 0 | bool advance() { |
83 | 0 | return ++iter != end; |
84 | 0 | } |
85 | | }; |
86 | | |
87 | | class CentroidListComparator { |
88 | | public: |
89 | 8.77k | CentroidListComparator() { |
90 | 8.77k | } |
91 | | |
92 | 0 | bool operator()(const CentroidList &left, const CentroidList &right) const { |
93 | 0 | return left.iter->mean() > right.iter->mean(); |
94 | 0 | } |
95 | | }; |
96 | | |
97 | | using CentroidListQueue = std::priority_queue<CentroidList, std::vector<CentroidList>, CentroidListComparator>; |
98 | | |
99 | | struct CentroidComparator { |
100 | 0 | bool operator()(const Centroid &a, const Centroid &b) const { |
101 | 0 | return a.mean() < b.mean(); |
102 | 0 | } |
103 | | }; |
104 | | |
105 | | class TDigest { |
106 | | class TDigestComparator { |
107 | | public: |
108 | 8.77k | TDigestComparator() { |
109 | 8.77k | } |
110 | | |
111 | 0 | bool operator()(const TDigest *left, const TDigest *right) const { |
112 | 0 | return left->totalSize() > right->totalSize(); |
113 | 0 | } |
114 | | }; |
115 | | |
116 | | using TDigestQueue = std::priority_queue<const TDigest *, std::vector<const TDigest *>, TDigestComparator>; |
117 | | |
118 | | public: |
119 | 0 | TDigest() : TDigest(1000) { |
120 | 0 | } |
121 | | |
122 | 17.5k | explicit TDigest(Value compression) : TDigest(compression, 0) { |
123 | 17.5k | } |
124 | | |
125 | 17.5k | TDigest(Value compression, Index bufferSize) : TDigest(compression, bufferSize, 0) { |
126 | 17.5k | } |
127 | | |
128 | | TDigest(Value compression, Index unmergedSize, Index mergedSize) |
129 | 17.5k | : compression_(compression), maxProcessed_(processedSize(mergedSize, compression)), |
130 | 17.5k | maxUnprocessed_(unprocessedSize(unmergedSize, compression)) { |
131 | 17.5k | processed_.reserve(maxProcessed_); |
132 | 17.5k | unprocessed_.reserve(maxUnprocessed_ + 1); |
133 | 17.5k | } |
134 | | |
135 | | TDigest(std::vector<Centroid> &&processed, std::vector<Centroid> &&unprocessed, Value compression, |
136 | | Index unmergedSize, Index mergedSize) |
137 | 0 | : TDigest(compression, unmergedSize, mergedSize) { |
138 | 0 | processed_ = std::move(processed); |
139 | 0 | unprocessed_ = std::move(unprocessed); |
140 | 0 |
|
141 | 0 | processedWeight_ = weight(processed_); |
142 | 0 | unprocessedWeight_ = weight(unprocessed_); |
143 | 0 | if (!processed_.empty()) { |
144 | 0 | min_ = std::min(min_, processed_[0].mean()); |
145 | 0 | max_ = std::max(max_, (processed_.cend() - 1)->mean()); |
146 | 0 | } |
147 | 0 | updateCumulative(); |
148 | 0 | } |
149 | | |
150 | 0 | static Weight weight(std::vector<Centroid> ¢roids) noexcept { |
151 | 0 | Weight w = 0.0; |
152 | 0 | for (auto centroid : centroids) { |
153 | 0 | w += centroid.weight(); |
154 | 0 | } |
155 | 0 | return w; |
156 | 0 | } |
157 | | |
158 | 0 | TDigest &operator=(TDigest &&o) { |
159 | 0 | compression_ = o.compression_; |
160 | 0 | maxProcessed_ = o.maxProcessed_; |
161 | 0 | maxUnprocessed_ = o.maxUnprocessed_; |
162 | 0 | processedWeight_ = o.processedWeight_; |
163 | 0 | unprocessedWeight_ = o.unprocessedWeight_; |
164 | 0 | processed_ = std::move(o.processed_); |
165 | 0 | unprocessed_ = std::move(o.unprocessed_); |
166 | 0 | cumulative_ = std::move(o.cumulative_); |
167 | 0 | min_ = o.min_; |
168 | 0 | max_ = o.max_; |
169 | 0 | return *this; |
170 | 0 | } |
171 | | |
172 | | TDigest(TDigest &&o) |
173 | | : TDigest(std::move(o.processed_), std::move(o.unprocessed_), o.compression_, o.maxUnprocessed_, |
174 | 0 | o.maxProcessed_) { |
175 | 0 | } |
176 | | |
177 | 17.5k | static inline Index processedSize(Index size, Value compression) noexcept { |
178 | 17.5k | return (size == 0) ? static_cast<Index>(2 * std::ceil(compression)) : size; |
179 | 17.5k | } |
180 | | |
181 | 17.5k | static inline Index unprocessedSize(Index size, Value compression) noexcept { |
182 | 17.5k | return (size == 0) ? static_cast<Index>(8 * std::ceil(compression)) : size; |
183 | 17.5k | } |
184 | | |
185 | | // merge in another t-digest |
186 | 8.77k | inline void merge(const TDigest *other) { |
187 | 8.77k | std::vector<const TDigest *> others {other}; |
188 | 8.77k | add(others.cbegin(), others.cend()); |
189 | 8.77k | } |
190 | | |
191 | 0 | const std::vector<Centroid> &processed() const { |
192 | 0 | return processed_; |
193 | 0 | } |
194 | | |
195 | 0 | const std::vector<Centroid> &unprocessed() const { |
196 | 0 | return unprocessed_; |
197 | 0 | } |
198 | | |
199 | 0 | Index maxUnprocessed() const { |
200 | 0 | return maxUnprocessed_; |
201 | 0 | } |
202 | | |
203 | 0 | Index maxProcessed() const { |
204 | 0 | return maxProcessed_; |
205 | 0 | } |
206 | | |
207 | 0 | inline void add(std::vector<const TDigest *> digests) { |
208 | 0 | add(digests.cbegin(), digests.cend()); |
209 | 0 | } |
210 | | |
211 | | // merge in a vector of tdigests in the most efficient manner possible |
212 | | // in constant space |
213 | | // works for any value of kHighWater |
214 | 8.77k | void add(std::vector<const TDigest *>::const_iterator iter, std::vector<const TDigest *>::const_iterator end) { |
215 | 8.77k | if (iter != end) { |
216 | 8.77k | auto size = std::distance(iter, end); |
217 | 8.77k | TDigestQueue pq(TDigestComparator {}); |
218 | 17.5k | for (; iter != end; iter++) { |
219 | 8.77k | pq.push((*iter)); |
220 | 8.77k | } |
221 | 8.77k | std::vector<const TDigest *> batch; |
222 | 8.77k | batch.reserve(size_t(size)); |
223 | | |
224 | 8.77k | size_t totalSize = 0; |
225 | 17.5k | while (!pq.empty()) { |
226 | 8.77k | auto td = pq.top(); |
227 | 8.77k | batch.push_back(td); |
228 | 8.77k | pq.pop(); |
229 | 8.77k | totalSize += td->totalSize(); |
230 | 8.77k | if (totalSize >= kHighWater || pq.empty()) { |
231 | 8.77k | mergeProcessed(batch); |
232 | 8.77k | mergeUnprocessed(batch); |
233 | 8.77k | processIfNecessary(); |
234 | 8.77k | batch.clear(); |
235 | 8.77k | totalSize = 0; |
236 | 8.77k | } |
237 | 8.77k | } |
238 | 8.77k | updateCumulative(); |
239 | 8.77k | } |
240 | 8.77k | } |
241 | | |
242 | 0 | Weight processedWeight() const { |
243 | 0 | return processedWeight_; |
244 | 0 | } |
245 | | |
246 | 0 | Weight unprocessedWeight() const { |
247 | 0 | return unprocessedWeight_; |
248 | 0 | } |
249 | | |
250 | 8.77k | bool haveUnprocessed() const { |
251 | 8.77k | return unprocessed_.size() > 0; |
252 | 8.77k | } |
253 | | |
254 | 8.77k | size_t totalSize() const { |
255 | 8.77k | return processed_.size() + unprocessed_.size(); |
256 | 8.77k | } |
257 | | |
258 | 0 | long totalWeight() const { |
259 | 0 | return static_cast<long>(processedWeight_ + unprocessedWeight_); |
260 | 0 | } |
261 | | |
262 | | // return the cdf on the t-digest |
263 | 0 | Value cdf(Value x) { |
264 | 0 | if (haveUnprocessed() || isDirty()) { |
265 | 0 | process(); |
266 | 0 | } |
267 | 0 | return cdfProcessed(x); |
268 | 0 | } |
269 | | |
270 | 26.3k | bool isDirty() { |
271 | 26.3k | return processed_.size() > maxProcessed_ || unprocessed_.size() > maxUnprocessed_; |
272 | 26.3k | } |
273 | | |
274 | | // return the cdf on the processed values |
275 | 0 | Value cdfProcessed(Value x) const { |
276 | 0 | if (processed_.empty()) { |
277 | 0 | // no data to examin_e |
278 | 0 |
|
279 | 0 | return 0.0; |
280 | 0 | } else if (processed_.size() == 1) { |
281 | 0 | // exactly one centroid, should have max_==min_ |
282 | 0 | auto width = max_ - min_; |
283 | 0 | if (x < min_) { |
284 | 0 | return 0.0; |
285 | 0 | } else if (x > max_) { |
286 | 0 | return 1.0; |
287 | 0 | } else if (x - min_ <= width) { |
288 | 0 | // min_ and max_ are too close together to do any viable interpolation |
289 | 0 | return 0.5; |
290 | 0 | } else { |
291 | 0 | // interpolate if somehow we have weight > 0 and max_ != min_ |
292 | 0 | return (x - min_) / (max_ - min_); |
293 | 0 | } |
294 | 0 | } else { |
295 | 0 | auto n = processed_.size(); |
296 | 0 | if (x <= min_) { |
297 | 0 | return 0; |
298 | 0 | } |
299 | 0 |
|
300 | 0 | if (x >= max_) { |
301 | 0 | return 1; |
302 | 0 | } |
303 | 0 |
|
304 | 0 | // check for the left tail |
305 | 0 | if (x <= mean(0)) { |
306 | 0 |
|
307 | 0 | // note that this is different than mean(0) > min_ ... this guarantees interpolation works |
308 | 0 | if (mean(0) - min_ > 0) { |
309 | 0 | return (x - min_) / (mean(0) - min_) * weight(0) / processedWeight_ / 2.0; |
310 | 0 | } else { |
311 | 0 | return 0; |
312 | 0 | } |
313 | 0 | } |
314 | 0 |
|
315 | 0 | // and the right tail |
316 | 0 | if (x >= mean(n - 1)) { |
317 | 0 | if (max_ - mean(n - 1) > 0) { |
318 | 0 | return 1.0 - (max_ - x) / (max_ - mean(n - 1)) * weight(n - 1) / processedWeight_ / 2.0; |
319 | 0 | } else { |
320 | 0 | return 1; |
321 | 0 | } |
322 | 0 | } |
323 | 0 |
|
324 | 0 | CentroidComparator cc; |
325 | 0 | auto iter = std::upper_bound(processed_.cbegin(), processed_.cend(), Centroid(x, 0), cc); |
326 | 0 |
|
327 | 0 | auto i = size_t(std::distance(processed_.cbegin(), iter)); |
328 | 0 | auto z1 = x - (iter - 1)->mean(); |
329 | 0 | auto z2 = (iter)->mean() - x; |
330 | 0 | return weightedAverage(cumulative_[i - 1], z2, cumulative_[i], z1) / processedWeight_; |
331 | 0 | } |
332 | 0 | } |
333 | | |
334 | | // this returns a quantile on the t-digest |
335 | 8.77k | Value quantile(Value q) { |
336 | 8.77k | if (haveUnprocessed() || isDirty()) { |
337 | 0 | process(); |
338 | 0 | } |
339 | 8.77k | return quantileProcessed(q); |
340 | 8.77k | } |
341 | | |
342 | | // this returns a quantile on the currently processed values without changing the t-digest |
343 | | // the value will not represent the unprocessed values |
344 | 8.77k | Value quantileProcessed(Value q) const { |
345 | 8.77k | if (q < 0 || q > 1) { |
346 | 0 | return NAN; |
347 | 0 | } |
348 | | |
349 | 8.77k | if (processed_.size() == 0) { |
350 | | // no sorted means no data, no way to get a quantile |
351 | 0 | return NAN; |
352 | 8.77k | } else if (processed_.size() == 1) { |
353 | | // with one data point, all quantiles lead to Rome |
354 | | |
355 | 8.77k | return mean(0); |
356 | 8.77k | } |
357 | | |
358 | | // we know that there are at least two sorted now |
359 | 0 | auto n = processed_.size(); |
360 | | |
361 | | // if values were stored in a sorted array, index would be the offset we are Weighterested in |
362 | 0 | const auto index = q * processedWeight_; |
363 | | |
364 | | // at the boundaries, we return min_ or max_ |
365 | 0 | if (index <= weight(0) / 2.0) { |
366 | 0 | return min_ + 2.0 * index / weight(0) * (mean(0) - min_); |
367 | 0 | } |
368 | | |
369 | 0 | auto iter = std::lower_bound(cumulative_.cbegin(), cumulative_.cend(), index); |
370 | |
|
371 | 0 | if (iter + 1 != cumulative_.cend()) { |
372 | 0 | auto i = size_t(std::distance(cumulative_.cbegin(), iter)); |
373 | 0 | auto z1 = index - *(iter - 1); |
374 | 0 | auto z2 = *(iter)-index; |
375 | | // LOG(INFO) << "z2 " << z2 << " index " << index << " z1 " << z1; |
376 | 0 | return weightedAverage(mean(i - 1), z2, mean(i), z1); |
377 | 0 | } |
378 | 0 | auto z1 = index - processedWeight_ - weight(n - 1) / 2.0; |
379 | 0 | auto z2 = weight(n - 1) / 2 - z1; |
380 | 0 | return weightedAverage(mean(n - 1), z1, max_, z2); |
381 | 0 | } |
382 | | |
383 | 0 | Value compression() const { |
384 | 0 | return compression_; |
385 | 0 | } |
386 | | |
387 | 8.77k | void add(Value x) { |
388 | 8.77k | add(x, 1); |
389 | 8.77k | } |
390 | | |
391 | 8.77k | inline void compress() { |
392 | 8.77k | process(); |
393 | 8.77k | } |
394 | | |
395 | | // add a single centroid to the unprocessed vector, processing previously unprocessed sorted if our limit has |
396 | | // been reached. |
397 | 8.77k | inline bool add(Value x, Weight w) { |
398 | 8.77k | if (std::isnan(x)) { |
399 | 0 | return false; |
400 | 0 | } |
401 | 8.77k | unprocessed_.push_back(Centroid(x, w)); |
402 | 8.77k | unprocessedWeight_ += w; |
403 | 8.77k | processIfNecessary(); |
404 | 8.77k | return true; |
405 | 8.77k | } |
406 | | |
407 | 0 | inline void add(std::vector<Centroid>::const_iterator iter, std::vector<Centroid>::const_iterator end) { |
408 | 0 | while (iter != end) { |
409 | 0 | const size_t diff = size_t(std::distance(iter, end)); |
410 | 0 | const size_t room = maxUnprocessed_ - unprocessed_.size(); |
411 | 0 | auto mid = iter + int64_t(std::min(diff, room)); |
412 | 0 | while (iter != mid) { |
413 | 0 | unprocessed_.push_back(*(iter++)); |
414 | 0 | } |
415 | 0 | if (unprocessed_.size() >= maxUnprocessed_) { |
416 | 0 | process(); |
417 | 0 | } |
418 | 0 | } |
419 | 0 | } |
420 | | |
421 | | private: |
422 | | Value compression_; |
423 | | |
424 | | Value min_ = std::numeric_limits<Value>::max(); |
425 | | |
426 | | Value max_ = std::numeric_limits<Value>::min(); |
427 | | |
428 | | Index maxProcessed_; |
429 | | |
430 | | Index maxUnprocessed_; |
431 | | |
432 | | Value processedWeight_ = 0.0; |
433 | | |
434 | | Value unprocessedWeight_ = 0.0; |
435 | | |
436 | | std::vector<Centroid> processed_; |
437 | | |
438 | | std::vector<Centroid> unprocessed_; |
439 | | |
440 | | std::vector<Weight> cumulative_; |
441 | | |
442 | | // return mean of i-th centroid |
443 | 8.77k | inline Value mean(size_t i) const noexcept { |
444 | 8.77k | return processed_[i].mean(); |
445 | 8.77k | } |
446 | | |
447 | | // return weight of i-th centroid |
448 | 8.77k | inline Weight weight(size_t i) const noexcept { |
449 | 8.77k | return processed_[i].weight(); |
450 | 8.77k | } |
451 | | |
452 | | // append all unprocessed centroids into current unprocessed vector |
453 | 8.77k | void mergeUnprocessed(const std::vector<const TDigest *> &tdigests) { |
454 | 8.77k | if (tdigests.size() == 0) { |
455 | 0 | return; |
456 | 0 | } |
457 | | |
458 | 8.77k | size_t total = unprocessed_.size(); |
459 | 8.77k | for (auto &td : tdigests) { |
460 | 8.77k | total += td->unprocessed_.size(); |
461 | 8.77k | } |
462 | | |
463 | 8.77k | unprocessed_.reserve(total); |
464 | 8.77k | for (auto &td : tdigests) { |
465 | 8.77k | unprocessed_.insert(unprocessed_.end(), td->unprocessed_.cbegin(), td->unprocessed_.cend()); |
466 | 8.77k | unprocessedWeight_ += td->unprocessedWeight_; |
467 | 8.77k | } |
468 | 8.77k | } |
469 | | |
470 | | // merge all processed centroids together into a single sorted vector |
471 | 8.77k | void mergeProcessed(const std::vector<const TDigest *> &tdigests) { |
472 | 8.77k | if (tdigests.size() == 0) { |
473 | 0 | return; |
474 | 0 | } |
475 | | |
476 | 8.77k | size_t total = 0; |
477 | 8.77k | CentroidListQueue pq(CentroidListComparator {}); |
478 | 8.77k | for (auto &td : tdigests) { |
479 | 8.77k | auto &sorted = td->processed_; |
480 | 8.77k | auto size = sorted.size(); |
481 | 8.77k | if (size > 0) { |
482 | 0 | pq.push(CentroidList(sorted)); |
483 | 0 | total += size; |
484 | 0 | processedWeight_ += td->processedWeight_; |
485 | 0 | } |
486 | 8.77k | } |
487 | 8.77k | if (total == 0) { |
488 | 8.77k | return; |
489 | 8.77k | } |
490 | | |
491 | 0 | if (processed_.size() > 0) { |
492 | 0 | pq.push(CentroidList(processed_)); |
493 | 0 | total += processed_.size(); |
494 | 0 | } |
495 | |
|
496 | 0 | std::vector<Centroid> sorted; |
497 | 0 | sorted.reserve(total); |
498 | |
|
499 | 0 | while (!pq.empty()) { |
500 | 0 | auto best = pq.top(); |
501 | 0 | pq.pop(); |
502 | 0 | sorted.push_back(*(best.iter)); |
503 | 0 | if (best.advance()) { |
504 | 0 | pq.push(best); |
505 | 0 | } |
506 | 0 | } |
507 | 0 | processed_ = std::move(sorted); |
508 | 0 | if (processed_.size() > 0) { |
509 | 0 | min_ = std::min(min_, processed_[0].mean()); |
510 | 0 | max_ = std::max(max_, (processed_.cend() - 1)->mean()); |
511 | 0 | } |
512 | 0 | } |
513 | | |
514 | 17.5k | inline void processIfNecessary() { |
515 | 17.5k | if (isDirty()) { |
516 | 0 | process(); |
517 | 0 | } |
518 | 17.5k | } |
519 | | |
520 | 17.5k | void updateCumulative() { |
521 | 17.5k | const auto n = processed_.size(); |
522 | 17.5k | cumulative_.clear(); |
523 | 17.5k | cumulative_.reserve(n + 1); |
524 | 17.5k | auto previous = 0.0; |
525 | 26.3k | for (Index i = 0; i < n; i++) { |
526 | 8.77k | auto current = weight(i); |
527 | 8.77k | auto halfCurrent = current / 2.0; |
528 | 8.77k | cumulative_.push_back(previous + halfCurrent); |
529 | 8.77k | previous = previous + current; |
530 | 8.77k | } |
531 | 17.5k | cumulative_.push_back(previous); |
532 | 17.5k | } |
533 | | |
534 | | // merges unprocessed_ centroids and processed_ centroids together and processes them |
535 | | // when complete, unprocessed_ will be empty and processed_ will have at most maxProcessed_ centroids |
536 | 8.77k | inline void process() { |
537 | 8.77k | CentroidComparator cc; |
538 | 8.77k | std::sort(unprocessed_.begin(), unprocessed_.end(), cc); |
539 | 8.77k | auto count = unprocessed_.size(); |
540 | 8.77k | unprocessed_.insert(unprocessed_.end(), processed_.cbegin(), processed_.cend()); |
541 | 8.77k | std::inplace_merge(unprocessed_.begin(), unprocessed_.begin() + int64_t(count), unprocessed_.end(), cc); |
542 | | |
543 | 8.77k | processedWeight_ += unprocessedWeight_; |
544 | 8.77k | unprocessedWeight_ = 0; |
545 | 8.77k | processed_.clear(); |
546 | | |
547 | 8.77k | processed_.push_back(unprocessed_[0]); |
548 | 8.77k | Weight wSoFar = unprocessed_[0].weight(); |
549 | 8.77k | Weight wLimit = processedWeight_ * integratedQ(1.0); |
550 | | |
551 | 8.77k | auto end = unprocessed_.end(); |
552 | 8.77k | for (auto iter = unprocessed_.cbegin() + 1; iter < end; iter++) { |
553 | 0 | auto ¢roid = *iter; |
554 | 0 | Weight projectedW = wSoFar + centroid.weight(); |
555 | 0 | if (projectedW <= wLimit) { |
556 | 0 | wSoFar = projectedW; |
557 | 0 | (processed_.end() - 1)->add(centroid); |
558 | 0 | } else { |
559 | 0 | auto k1 = integratedLocation(wSoFar / processedWeight_); |
560 | 0 | wLimit = processedWeight_ * integratedQ(k1 + 1.0); |
561 | 0 | wSoFar += centroid.weight(); |
562 | 0 | processed_.emplace_back(centroid); |
563 | 0 | } |
564 | 0 | } |
565 | 8.77k | unprocessed_.clear(); |
566 | 8.77k | min_ = std::min(min_, processed_[0].mean()); |
567 | 8.77k | max_ = std::max(max_, (processed_.cend() - 1)->mean()); |
568 | 8.77k | updateCumulative(); |
569 | 8.77k | } |
570 | | |
571 | 0 | inline size_t checkWeights() { |
572 | 0 | return checkWeights(processed_, processedWeight_); |
573 | 0 | } |
574 | | |
575 | 0 | size_t checkWeights(const std::vector<Centroid> &sorted, Value total) { |
576 | 0 | size_t badWeight = 0; |
577 | 0 | auto k1 = 0.0; |
578 | 0 | auto q = 0.0; |
579 | 0 | for (auto iter = sorted.cbegin(); iter != sorted.cend(); iter++) { |
580 | 0 | auto w = iter->weight(); |
581 | 0 | auto dq = w / total; |
582 | 0 | auto k2 = integratedLocation(q + dq); |
583 | 0 | if (k2 - k1 > 1 && w != 1) { |
584 | 0 | badWeight++; |
585 | 0 | } |
586 | 0 | if (k2 - k1 > 1.5 && w != 1) { |
587 | 0 | badWeight++; |
588 | 0 | } |
589 | 0 | q += dq; |
590 | 0 | k1 = k2; |
591 | 0 | } |
592 | 0 |
|
593 | 0 | return badWeight; |
594 | 0 | } |
595 | | |
596 | | /** |
597 | | * Converts a quantile into a centroid scale value. The centroid scale is nomin_ally |
598 | | * the number k of the centroid that a quantile point q should belong to. Due to |
599 | | * round-offs, however, we can't align things perfectly without splitting points |
600 | | * and sorted. We don't want to do that, so we have to allow for offsets. |
601 | | * In the end, the criterion is that any quantile range that spans a centroid |
602 | | * scale range more than one should be split across more than one centroid if |
603 | | * possible. This won't be possible if the quantile range refers to a single point |
604 | | * or an already existing centroid. |
605 | | * <p/> |
606 | | * This mapping is steep near q=0 or q=1 so each centroid there will correspond to |
607 | | * less q range. Near q=0.5, the mapping is flatter so that sorted there will |
608 | | * represent a larger chunk of quantiles. |
609 | | * |
610 | | * @param q The quantile scale value to be mapped. |
611 | | * @return The centroid scale value corresponding to q. |
612 | | */ |
613 | 0 | inline Value integratedLocation(Value q) const { |
614 | 0 | return compression_ * (std::asin(2.0 * q - 1.0) + pi / 2) / pi; |
615 | 0 | } |
616 | | |
617 | 8.77k | inline Value integratedQ(Value k) const { |
618 | 8.77k | return (std::sin(std::min(k, compression_) * pi / compression_ - pi / 2) + 1) / 2; |
619 | 8.77k | } |
620 | | |
621 | | /** |
622 | | * Same as {@link #weightedAverageSorted(Value, Value, Value, Value)} but flips |
623 | | * the order of the variables if <code>x2</code> is greater than |
624 | | * <code>x1</code>. |
625 | | */ |
626 | 0 | static Value weightedAverage(Value x1, Value w1, Value x2, Value w2) { |
627 | 0 | return (x1 <= x2) ? weightedAverageSorted(x1, w1, x2, w2) : weightedAverageSorted(x2, w2, x1, w1); |
628 | 0 | } |
629 | | |
630 | | /** |
631 | | * Compute the weighted average between <code>x1</code> with a weight of |
632 | | * <code>w1</code> and <code>x2</code> with a weight of <code>w2</code>. |
633 | | * This expects <code>x1</code> to be less than or equal to <code>x2</code> |
634 | | * and is guaranteed to return a number between <code>x1</code> and |
635 | | * <code>x2</code>. |
636 | | */ |
637 | 0 | static Value weightedAverageSorted(Value x1, Value w1, Value x2, Value w2) { |
638 | 0 | const Value x = (x1 * w1 + x2 * w2) / (w1 + w2); |
639 | 0 | return std::max(x1, std::min(x, x2)); |
640 | 0 | } |
641 | | |
642 | 0 | static Value interpolate(Value x, Value x0, Value x1) { |
643 | 0 | return (x - x0) / (x1 - x0); |
644 | 0 | } |
645 | | |
646 | | /** |
647 | | * Computes an interpolated value of a quantile that is between two sorted. |
648 | | * |
649 | | * Index is the quantile desired multiplied by the total number of samples - 1. |
650 | | * |
651 | | * @param index Denormalized quantile desired |
652 | | * @param previousIndex The denormalized quantile corresponding to the center of the previous centroid. |
653 | | * @param nextIndex The denormalized quantile corresponding to the center of the following centroid. |
654 | | * @param previousMean The mean of the previous centroid. |
655 | | * @param nextMean The mean of the following centroid. |
656 | | * @return The interpolated mean. |
657 | | */ |
658 | 0 | static Value quantile(Value index, Value previousIndex, Value nextIndex, Value previousMean, Value nextMean) { |
659 | 0 | const auto delta = nextIndex - previousIndex; |
660 | 0 | const auto previousWeight = (nextIndex - index) / delta; |
661 | 0 | const auto nextWeight = (index - previousIndex) / delta; |
662 | 0 | return previousMean * previousWeight + nextMean * nextWeight; |
663 | 0 | } |
664 | | }; |
665 | | |
666 | | } // namespace tdigest |