Coverage Report

Created: 2025-08-05 06:45

/src/brpc/src/bvar/collector.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// Date: Mon Dec 14 19:12:30 CST 2015
19
20
#ifndef BVAR_COLLECTOR_H
21
#define BVAR_COLLECTOR_H
22
23
#include "butil/containers/linked_list.h"
24
#include "butil/fast_rand.h"
25
#include "butil/time.h"
26
#include "butil/atomicops.h"
27
#include "bvar/passive_status.h"
28
29
namespace bvar {
30
31
static const size_t INVALID_SAMPLING_RANGE = 0;
32
33
0
inline bool is_sampling_range_valid(size_t sampling_range) {
34
0
    return sampling_range > 0;
35
0
}
36
37
// Containing the context for limiting sampling speed.
38
struct CollectorSpeedLimit {
39
    // [Managed by Collector, don't change!]
40
    size_t sampling_range;
41
    bool ever_grabbed;
42
    butil::static_atomic<int> count_before_grabbed;
43
    int64_t first_sample_real_us;
44
};
45
46
static const size_t COLLECTOR_SAMPLING_BASE = 16384;
47
48
#define BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER                          \
49
    { ::bvar::COLLECTOR_SAMPLING_BASE, false, BUTIL_STATIC_ATOMIC_INIT(0), 0 }
50
51
class Collected;
52
53
// For processing samples in batch before dumping.
54
class CollectorPreprocessor {
55
public:
56
0
    virtual ~CollectorPreprocessor() = default;
57
    virtual void process(std::vector<Collected*>& samples) = 0;
58
};
59
60
// Steps for sampling and dumping sth:
61
//  1. Implement Collected
62
//  2. Create an instance and fill in data.
63
//  3. submit() the instance.
64
class Collected : public butil::LinkNode<Collected> {
65
public:
66
0
    virtual ~Collected() {}
67
    
68
    // Sumbit the sample for later dumping, a sample can only be submitted once.
69
    // submit() is implemented as writing a value to bvar::Reducer which does
70
    // not compete globally. This function generally does not alter the
71
    // interleaving status of threads even in highly contended situations.
72
    // You should also create the sample using a malloc() impl. that are
73
    // unlikely to contend, keeping interruptions minimal.
74
    // `cpuwide_us' should be got from butil::cpuwide_time_us(). If it's far
75
    // from the timestamp updated by collecting thread(which basically means
76
    // the thread is not scheduled by OS in time), this sample is directly
77
    // destroy()-ed to avoid memory explosion.
78
    void submit(int64_t cpuwide_us);
79
0
    void submit() { submit(butil::cpuwide_time_us()); }
80
81
    // Implement this method to dump the sample into files and destroy it.
82
    // This method is called in a separate thread and can be blocked
83
    // indefinitely long(not recommended). If too many samples wait for
84
    // this funcion due to previous sample's blocking, they'll be destroy()-ed.
85
    // If you need to run destruction code upon thread's exit, use
86
    // butil::thread_atexit. Dumping thread run this function in batch, each
87
    // batch is counted as one "round", `round_index' is the round that
88
    // dumping thread is currently at, counting from 1.
89
    virtual void dump_and_destroy(size_t round_index) = 0;
90
91
    // Destroy the sample. Will be called for at most once. Since dumping
92
    // thread generally quits upon the termination of program, some samples
93
    // are directly recycled along with program w/o calling destroy().
94
    virtual void destroy() = 0;
95
96
    // Returns an object to control #samples collected per second.
97
    // If NULL is returned, samples collected per second is limited by a
98
    // global speed limit shared with other samples also returning NULL.
99
    // All instances of a subclass of Collected should return a same instance
100
    // of CollectorSpeedLimit. The instance should remain valid during lifetime
101
    // of program.
102
    virtual CollectorSpeedLimit* speed_limit() = 0;
103
104
    // If this method returns a non-NULL instance, it will be applied to
105
    // samples in batch before dumping. You can sort or shuffle the samples
106
    // in the impl.
107
    // All instances of a subclass of Collected should return a same instance
108
    // of CollectorPreprocessor. The instance should remain valid during
109
    // lifetime of program.
110
0
    virtual CollectorPreprocessor* preprocessor() { return NULL; }
111
};
112
113
// To know if an instance should be sampled.
114
// Returns a positive number when the object should be sampled, 0 otherwise.
115
// The number is approximately the current probability of sampling times
116
// COLLECTOR_SAMPLING_BASE, it varies from seconds to seconds being adjusted
117
// by collecting thread to control the samples collected per second.
118
// This function should cost less than 10ns in most cases.
119
0
inline size_t is_collectable(CollectorSpeedLimit* speed_limit) {
120
0
    if (speed_limit->ever_grabbed) { // most common case
121
0
        const size_t sampling_range = speed_limit->sampling_range;
122
        // fast_rand is faster than fast_rand_in
123
0
        if ((butil::fast_rand() & (COLLECTOR_SAMPLING_BASE - 1)) >= sampling_range) {
124
0
            return INVALID_SAMPLING_RANGE;
125
0
        }
126
0
        return sampling_range;
127
0
    }
128
    // Slower, only runs before -bvar_collector_expected_per_second samples are
129
    // collected to calculate a more reasonable sampling_range for the type.
130
0
    extern size_t is_collectable_before_first_time_grabbed(CollectorSpeedLimit*);
131
0
    return is_collectable_before_first_time_grabbed(speed_limit);
132
0
}
133
134
// An utility for displaying current sampling ratio according to speed limit.
135
class DisplaySamplingRatio {
136
public:
137
    DisplaySamplingRatio(const char* name, const CollectorSpeedLimit*);
138
private:
139
    bvar::PassiveStatus<double> _var;
140
};
141
142
}  // namespace bvar
143
144
#endif  // BVAR_COLLECTOR_H