Coverage Report

Created: 2023-11-19 06:38

/src/brpc/src/brpc/details/sparse_minute_counter.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
19
#ifndef BRPC_SPARSE_MINUTE_COUNTER_H
20
#define BRPC_SPARSE_MINUTE_COUNTER_H
21
22
23
#include "butil/containers/bounded_queue.h"
24
25
namespace brpc {
26
27
// An utility to add up per-second value into per-minute value.
28
// About "sparse":
29
//   This utility assumes that when a lot of instances exist, many of them do
30
//   not need to update every second. This is true for stats of connections.
31
//   When a lot of connections(>100K) connected to one server, most of them
32
//   are idle since the overall actitivities of all connections are limited
33
//   by throughput of the server. To make use of the fact, this utility stores
34
//   per-second values in a sparse array tagged with timestamps. The array
35
//   is resized on-demand to save memory.
36
template <typename T> class SparseMinuteCounter {
37
    struct Item {
38
        int64_t timestamp_ms;
39
        T value;
40
0
        Item() : timestamp_ms(0) {}
41
0
        Item(int64_t ts, const T& v) : timestamp_ms(ts), value(v) {}
42
    };
43
public:
44
0
    SparseMinuteCounter() : _q(NULL) {}
45
0
    ~SparseMinuteCounter() { DestroyQueue(_q); }
46
47
    // Add `value' into this counter at timestamp `now_ms'
48
    // Returns true when old value is popped and set into *popped.
49
    bool Add(int64_t now_ms, const T& value, T* popped);
50
    
51
    // Try to pop value before one minute.
52
    // Returns true when old value is popped and set into *popped.
53
    bool TryPop(int64_t now_ms, T* popped);
54
55
private:
56
    DISALLOW_COPY_AND_ASSIGN(SparseMinuteCounter);
57
    typedef butil::BoundedQueue<Item> Q;
58
    static Q* CreateQueue(uint32_t cap);
59
    static void DestroyQueue(Q* q);
60
    void Resize();
61
62
    Q* _q;
63
    Item _first_item;
64
};
65
66
template <typename T>
67
0
bool SparseMinuteCounter<T>::Add(int64_t now_ms, const T& val, T* popped) {
68
0
    if (_q) { // more common
69
0
        Item new_item(now_ms, val);
70
0
        if (_q->full()) {
71
0
            const Item* const oldest = _q->top();
72
0
            if (now_ms < oldest->timestamp_ms + 60000 &&
73
0
                _q->capacity() < 60) {
74
0
                Resize();
75
0
                _q->push(new_item);
76
0
                return false;
77
0
            } else {
78
0
                *popped = oldest->value;
79
0
                _q->pop();
80
0
                _q->push(new_item);
81
0
                return true;
82
0
            }
83
0
        } else {
84
0
            _q->push(new_item);
85
0
            return false;
86
0
        }
87
0
    } else {
88
        // first-time storing is different. If Add() is rarely called,
89
        // This strategy may not allocate _q at all.
90
0
        if (_first_item.timestamp_ms == 0) {
91
0
            _first_item.timestamp_ms = std::max(now_ms, (int64_t)1);
92
0
            _first_item.value = val;
93
0
            return false;
94
0
        }
95
0
        const int64_t delta = now_ms - _first_item.timestamp_ms;
96
0
        if (delta >= 60000) {
97
0
            *popped = _first_item.value;
98
0
            _first_item.timestamp_ms = std::max(now_ms, (int64_t)1);
99
0
            _first_item.value = val;
100
0
            return true;
101
0
        }
102
        // Predict initial capacity of _q according to interval between
103
        // now_ms and last timestamp.
104
0
        int64_t initial_cap = (delta <= 1000 ? 30 : (60000 + delta - 1) / delta);
105
0
        if (initial_cap < 4) {
106
0
            initial_cap = 4;
107
0
        }
108
0
        _q = CreateQueue(initial_cap);
109
0
        _q->push(_first_item);
110
0
        _q->push(Item(now_ms, val));
111
0
        return false;
112
0
    }
113
0
}
114
115
template <typename T>
116
typename SparseMinuteCounter<T>::Q*
117
0
SparseMinuteCounter<T>::CreateQueue(uint32_t cap) {
118
0
    const size_t memsize =
119
0
        sizeof(Q) + sizeof(Item) * cap;
120
0
    char* mem = (char*)malloc(memsize); // intended crash on ENOMEM
121
0
    return new (mem) Q(mem + sizeof(Q), sizeof(Item) * cap, butil::NOT_OWN_STORAGE);
122
0
}
123
124
template <typename T>
125
0
void SparseMinuteCounter<T>::DestroyQueue(Q* q) {
126
0
    if (q) {
127
0
        q->~Q();
128
0
        free(q);
129
0
    }
130
0
}
131
132
template <typename T>
133
0
void SparseMinuteCounter<T>::Resize() {
134
0
    CHECK_LT(_q->capacity(), (size_t)60);
135
0
    uint32_t new_cap = std::min(2 * (uint32_t)_q->capacity(), 60u);
136
0
    Q* new_q = CreateQueue(new_cap);
137
0
    for (size_t i = 0; i < _q->size(); ++i) {
138
0
        new_q->push(*_q->top(i));
139
0
    }
140
0
    DestroyQueue(_q);
141
0
    _q = new_q;
142
0
}
143
144
template <typename T>
145
0
bool SparseMinuteCounter<T>::TryPop(int64_t now_ms, T* popped) {
146
0
    if (_q) {
147
0
        const Item* const oldest = _q->top();
148
0
        if (oldest == NULL || now_ms < oldest->timestamp_ms + 60000) {
149
0
            return false;
150
0
        }
151
0
        *popped = oldest->value;
152
0
        _q->pop();
153
0
        return true;
154
0
    } else {
155
0
        if (_first_item.timestamp_ms == 0 ||
156
0
            now_ms < _first_item.timestamp_ms + 60000) {
157
0
            return false;
158
0
        }
159
0
        _first_item.timestamp_ms = 0;
160
0
        *popped = _first_item.value;
161
0
        return true;
162
0
    }
163
0
}
164
165
} // namespace brpc
166
167
168
#endif  // BRPC_SPARSE_MINUTE_COUNTER_H