Coverage Report

Created: 2026-01-10 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bvar/detail/sampler.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// Date: Tue Jul 28 18:14:40 CST 2015
19
20
#include <gflags/gflags.h>
21
#include "butil/threading/platform_thread.h"
22
#include "butil/time.h"
23
#include "butil/memory/singleton_on_pthread_once.h"
24
#include "bvar/reducer.h"
25
#include "bvar/detail/sampler.h"
26
#include "bvar/passive_status.h"
27
#include "bvar/window.h"
28
29
namespace bvar {
30
namespace detail {
31
32
const int WARN_NOSLEEP_THRESHOLD = 2;
33
34
// Combine two circular linked list into one.
35
struct CombineSampler {
36
10
    void operator()(Sampler* & s1, Sampler* s2) const {
37
10
        if (s2 == NULL) {
38
0
            return;
39
0
        }
40
10
        if (s1 == NULL) {
41
4
            s1 = s2;
42
4
            return;
43
4
        }
44
6
        s1->InsertBeforeAsList(s2);
45
6
    }
46
};
47
48
// True iff pthread_atfork was called. The callback to atfork works for child
49
// of child as well, no need to register in the child again.
50
static bool registered_atfork = false;
51
52
// Call take_sample() of all scheduled samplers.
53
// This can be done with regular timer thread, but it's way too slow(global
54
// contention + log(N) heap manipulations). We need it to be super fast so that
55
// creation overhead of Window<> is negliable.
56
// The trick is to use Reducer<Sampler*, CombineSampler>. Each Sampler is
57
// doubly linked, thus we can reduce multiple Samplers into one cicurlarly
58
// doubly linked list, and multiple lists into larger lists. We create a
59
// dedicated thread to periodically get_value() which is just the combined
60
// list of Samplers. Waking through the list and call take_sample().
61
// If a Sampler needs to be deleted, we just mark it as unused and the
62
// deletion is taken place in the thread as well.
63
class SamplerCollector : public bvar::Reducer<Sampler*, CombineSampler> {
64
public:
65
    SamplerCollector()
66
2
        : _created(false)
67
2
        , _stop(false)
68
2
        , _cumulated_time_us(0) {
69
2
        create_sampling_thread();
70
2
    }
71
0
    ~SamplerCollector() {
72
0
        if (_created) {
73
0
            _stop = true;
74
0
            pthread_join(_tid, NULL);
75
0
            _created = false;
76
0
        }
77
0
    }
78
79
private:
80
    // Support for fork:
81
    // * The singleton can be null before forking, the child callback will not
82
    //   be registered.
83
    // * If the singleton is not null before forking, the child callback will
84
    //   be registered and the sampling thread will be re-created.
85
    // * A forked program can be forked again.
86
87
0
    static void child_callback_atfork() {
88
0
        butil::get_leaky_singleton<SamplerCollector>()->after_forked_as_child();
89
0
    }
90
91
2
    void create_sampling_thread() {
92
2
        const int rc = pthread_create(&_tid, NULL, sampling_thread, this);
93
2
        if (rc != 0) {
94
0
            LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc);
95
2
        } else {
96
2
            _created = true;
97
2
            if (!registered_atfork) {
98
2
                registered_atfork = true;
99
2
                pthread_atfork(NULL, NULL, child_callback_atfork);
100
2
            }
101
2
        }
102
2
    }
103
104
0
    void after_forked_as_child() {
105
0
        _created = false;
106
0
        create_sampling_thread();
107
0
    }
108
109
    void run();
110
111
2
    static void* sampling_thread(void* arg) {
112
2
        butil::PlatformThread::SetNameSimple("bvar_sampler");
113
2
        static_cast<SamplerCollector*>(arg)->run();
114
2
        return NULL;
115
2
    }
116
117
0
    static double get_cumulated_time(void* arg) {
118
0
        return static_cast<SamplerCollector*>(arg)->_cumulated_time_us / 1000.0 / 1000.0;
119
0
    }
120
121
private:
122
    bool _created;
123
    bool _stop;
124
    int64_t _cumulated_time_us;
125
    pthread_t _tid;
126
};
127
128
#ifndef UNIT_TEST
129
static PassiveStatus<double>* s_cumulated_time_bvar = NULL;
130
static bvar::PerSecond<bvar::PassiveStatus<double> >* s_sampling_thread_usage_bvar = NULL;
131
#endif
132
133
DEFINE_int32(bvar_sampler_thread_start_delay_us, 10000, "bvar sampler thread start delay us");
134
135
2
void SamplerCollector::run() {
136
2
    ::usleep(FLAGS_bvar_sampler_thread_start_delay_us);
137
    
138
#ifndef UNIT_TEST
139
    // NOTE:
140
    // * Following vars can't be created on thread's stack since this thread
141
    //   may be abandoned at any time after forking.
142
    // * They can't created inside the constructor of SamplerCollector as well,
143
    //   which results in deadlock.
144
    if (s_cumulated_time_bvar == NULL) {
145
        s_cumulated_time_bvar =
146
            new PassiveStatus<double>(get_cumulated_time, this);
147
    }
148
    if (s_sampling_thread_usage_bvar == NULL) {
149
        s_sampling_thread_usage_bvar =
150
            new bvar::PerSecond<bvar::PassiveStatus<double> >(
151
                    "bvar_sampler_collector_usage", s_cumulated_time_bvar, 10);
152
    }
153
#endif
154
155
2
    butil::LinkNode<Sampler> root;
156
2
    int consecutive_nosleep = 0;
157
4
    while (!_stop) {
158
2
        int64_t abstime = butil::gettimeofday_us();
159
2
        Sampler* s = this->reset();
160
2
        if (s) {
161
2
            s->InsertBeforeAsList(&root);
162
2
        }
163
10
        for (butil::LinkNode<Sampler>* p = root.next(); p != &root;) {
164
            // We may remove p from the list, save next first.
165
8
            butil::LinkNode<Sampler>* saved_next = p->next();
166
8
            Sampler* s = p->value();
167
8
            s->_mutex.lock();
168
8
            if (!s->_used) {
169
0
                s->_mutex.unlock();
170
0
                p->RemoveFromList();
171
0
                delete s;
172
8
            } else {
173
8
                s->take_sample();
174
8
                s->_mutex.unlock();
175
8
            }
176
8
            p = saved_next;
177
8
        }
178
2
        bool slept = false;
179
2
        int64_t now = butil::gettimeofday_us();
180
2
        _cumulated_time_us += now - abstime;
181
2
        abstime += 1000000L;
182
4
        while (abstime > now) {
183
2
            ::usleep(abstime - now);
184
2
            slept = true;
185
2
            now = butil::gettimeofday_us();
186
2
        }
187
2
        if (slept) {
188
0
            consecutive_nosleep = 0;
189
2
        } else {            
190
2
            if (++consecutive_nosleep >= WARN_NOSLEEP_THRESHOLD) {
191
0
                consecutive_nosleep = 0;
192
0
                LOG(WARNING) << "bvar is busy at sampling for "
193
0
                             << WARN_NOSLEEP_THRESHOLD << " seconds!";
194
0
            }
195
2
        }
196
2
    }
197
2
}
198
199
8
Sampler::Sampler() : _used(true) {}
200
201
0
Sampler::~Sampler() {}
202
203
DEFINE_bool(bvar_enable_sampling, true, "is enable bvar sampling");
204
205
8
void Sampler::schedule() {
206
    // since the SamplerCollector is initialized before the program starts
207
    // flags will not take effect if used in the SamplerCollector constructor
208
8
    if (FLAGS_bvar_enable_sampling) {
209
8
        *butil::get_leaky_singleton<SamplerCollector>() << this;
210
8
    }
211
8
}
212
213
0
void Sampler::destroy() {
214
0
    _mutex.lock();
215
0
    _used = false;
216
0
    _mutex.unlock();
217
0
}
218
219
}  // namespace detail
220
}  // namespace bvar