/src/brpc/src/bvar/detail/sampler.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // Date: Tue Jul 28 18:14:40 CST 2015 |
19 | | |
20 | | #include <gflags/gflags.h> |
21 | | #include "butil/threading/platform_thread.h" |
22 | | #include "butil/time.h" |
23 | | #include "butil/memory/singleton_on_pthread_once.h" |
24 | | #include "bvar/reducer.h" |
25 | | #include "bvar/detail/sampler.h" |
26 | | #include "bvar/passive_status.h" |
27 | | #include "bvar/window.h" |
28 | | |
29 | | namespace bvar { |
30 | | namespace detail { |
31 | | |
32 | | const int WARN_NOSLEEP_THRESHOLD = 2; |
33 | | |
34 | | // Combine two circular linked list into one. |
35 | | struct CombineSampler { |
36 | 10 | void operator()(Sampler* & s1, Sampler* s2) const { |
37 | 10 | if (s2 == NULL) { |
38 | 0 | return; |
39 | 0 | } |
40 | 10 | if (s1 == NULL) { |
41 | 4 | s1 = s2; |
42 | 4 | return; |
43 | 4 | } |
44 | 6 | s1->InsertBeforeAsList(s2); |
45 | 6 | } |
46 | | }; |
47 | | |
48 | | // True iff pthread_atfork was called. The callback to atfork works for child |
49 | | // of child as well, no need to register in the child again. |
50 | | static bool registered_atfork = false; |
51 | | |
52 | | // Call take_sample() of all scheduled samplers. |
53 | | // This can be done with regular timer thread, but it's way too slow(global |
54 | | // contention + log(N) heap manipulations). We need it to be super fast so that |
55 | | // creation overhead of Window<> is negliable. |
56 | | // The trick is to use Reducer<Sampler*, CombineSampler>. Each Sampler is |
57 | | // doubly linked, thus we can reduce multiple Samplers into one cicurlarly |
58 | | // doubly linked list, and multiple lists into larger lists. We create a |
59 | | // dedicated thread to periodically get_value() which is just the combined |
60 | | // list of Samplers. Waking through the list and call take_sample(). |
61 | | // If a Sampler needs to be deleted, we just mark it as unused and the |
62 | | // deletion is taken place in the thread as well. |
63 | | class SamplerCollector : public bvar::Reducer<Sampler*, CombineSampler> { |
64 | | public: |
65 | | SamplerCollector() |
66 | 2 | : _created(false) |
67 | 2 | , _stop(false) |
68 | 2 | , _cumulated_time_us(0) { |
69 | 2 | create_sampling_thread(); |
70 | 2 | } |
71 | 0 | ~SamplerCollector() { |
72 | 0 | if (_created) { |
73 | 0 | _stop = true; |
74 | 0 | pthread_join(_tid, NULL); |
75 | 0 | _created = false; |
76 | 0 | } |
77 | 0 | } |
78 | | |
79 | | private: |
80 | | // Support for fork: |
81 | | // * The singleton can be null before forking, the child callback will not |
82 | | // be registered. |
83 | | // * If the singleton is not null before forking, the child callback will |
84 | | // be registered and the sampling thread will be re-created. |
85 | | // * A forked program can be forked again. |
86 | | |
87 | 0 | static void child_callback_atfork() { |
88 | 0 | butil::get_leaky_singleton<SamplerCollector>()->after_forked_as_child(); |
89 | 0 | } |
90 | | |
91 | 2 | void create_sampling_thread() { |
92 | 2 | const int rc = pthread_create(&_tid, NULL, sampling_thread, this); |
93 | 2 | if (rc != 0) { |
94 | 0 | LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc); |
95 | 2 | } else { |
96 | 2 | _created = true; |
97 | 2 | if (!registered_atfork) { |
98 | 2 | registered_atfork = true; |
99 | 2 | pthread_atfork(NULL, NULL, child_callback_atfork); |
100 | 2 | } |
101 | 2 | } |
102 | 2 | } |
103 | | |
104 | 0 | void after_forked_as_child() { |
105 | 0 | _created = false; |
106 | 0 | create_sampling_thread(); |
107 | 0 | } |
108 | | |
109 | | void run(); |
110 | | |
111 | 2 | static void* sampling_thread(void* arg) { |
112 | 2 | butil::PlatformThread::SetNameSimple("bvar_sampler"); |
113 | 2 | static_cast<SamplerCollector*>(arg)->run(); |
114 | 2 | return NULL; |
115 | 2 | } |
116 | | |
117 | 0 | static double get_cumulated_time(void* arg) { |
118 | 0 | return static_cast<SamplerCollector*>(arg)->_cumulated_time_us / 1000.0 / 1000.0; |
119 | 0 | } |
120 | | |
121 | | private: |
122 | | bool _created; |
123 | | bool _stop; |
124 | | int64_t _cumulated_time_us; |
125 | | pthread_t _tid; |
126 | | }; |
127 | | |
128 | | #ifndef UNIT_TEST |
129 | | static PassiveStatus<double>* s_cumulated_time_bvar = NULL; |
130 | | static bvar::PerSecond<bvar::PassiveStatus<double> >* s_sampling_thread_usage_bvar = NULL; |
131 | | #endif |
132 | | |
133 | | DEFINE_int32(bvar_sampler_thread_start_delay_us, 10000, "bvar sampler thread start delay us"); |
134 | | |
135 | 2 | void SamplerCollector::run() { |
136 | 2 | ::usleep(FLAGS_bvar_sampler_thread_start_delay_us); |
137 | | |
138 | | #ifndef UNIT_TEST |
139 | | // NOTE: |
140 | | // * Following vars can't be created on thread's stack since this thread |
141 | | // may be abandoned at any time after forking. |
142 | | // * They can't created inside the constructor of SamplerCollector as well, |
143 | | // which results in deadlock. |
144 | | if (s_cumulated_time_bvar == NULL) { |
145 | | s_cumulated_time_bvar = |
146 | | new PassiveStatus<double>(get_cumulated_time, this); |
147 | | } |
148 | | if (s_sampling_thread_usage_bvar == NULL) { |
149 | | s_sampling_thread_usage_bvar = |
150 | | new bvar::PerSecond<bvar::PassiveStatus<double> >( |
151 | | "bvar_sampler_collector_usage", s_cumulated_time_bvar, 10); |
152 | | } |
153 | | #endif |
154 | | |
155 | 2 | butil::LinkNode<Sampler> root; |
156 | 2 | int consecutive_nosleep = 0; |
157 | 4 | while (!_stop) { |
158 | 2 | int64_t abstime = butil::gettimeofday_us(); |
159 | 2 | Sampler* s = this->reset(); |
160 | 2 | if (s) { |
161 | 2 | s->InsertBeforeAsList(&root); |
162 | 2 | } |
163 | 10 | for (butil::LinkNode<Sampler>* p = root.next(); p != &root;) { |
164 | | // We may remove p from the list, save next first. |
165 | 8 | butil::LinkNode<Sampler>* saved_next = p->next(); |
166 | 8 | Sampler* s = p->value(); |
167 | 8 | s->_mutex.lock(); |
168 | 8 | if (!s->_used) { |
169 | 0 | s->_mutex.unlock(); |
170 | 0 | p->RemoveFromList(); |
171 | 0 | delete s; |
172 | 8 | } else { |
173 | 8 | s->take_sample(); |
174 | 8 | s->_mutex.unlock(); |
175 | 8 | } |
176 | 8 | p = saved_next; |
177 | 8 | } |
178 | 2 | bool slept = false; |
179 | 2 | int64_t now = butil::gettimeofday_us(); |
180 | 2 | _cumulated_time_us += now - abstime; |
181 | 2 | abstime += 1000000L; |
182 | 4 | while (abstime > now) { |
183 | 2 | ::usleep(abstime - now); |
184 | 2 | slept = true; |
185 | 2 | now = butil::gettimeofday_us(); |
186 | 2 | } |
187 | 2 | if (slept) { |
188 | 0 | consecutive_nosleep = 0; |
189 | 2 | } else { |
190 | 2 | if (++consecutive_nosleep >= WARN_NOSLEEP_THRESHOLD) { |
191 | 0 | consecutive_nosleep = 0; |
192 | 0 | LOG(WARNING) << "bvar is busy at sampling for " |
193 | 0 | << WARN_NOSLEEP_THRESHOLD << " seconds!"; |
194 | 0 | } |
195 | 2 | } |
196 | 2 | } |
197 | 2 | } |
198 | | |
199 | 8 | Sampler::Sampler() : _used(true) {} |
200 | | |
201 | 0 | Sampler::~Sampler() {} |
202 | | |
203 | | DEFINE_bool(bvar_enable_sampling, true, "is enable bvar sampling"); |
204 | | |
205 | 8 | void Sampler::schedule() { |
206 | | // since the SamplerCollector is initialized before the program starts |
207 | | // flags will not take effect if used in the SamplerCollector constructor |
208 | 8 | if (FLAGS_bvar_enable_sampling) { |
209 | 8 | *butil::get_leaky_singleton<SamplerCollector>() << this; |
210 | 8 | } |
211 | 8 | } |
212 | | |
213 | 0 | void Sampler::destroy() { |
214 | 0 | _mutex.lock(); |
215 | 0 | _used = false; |
216 | 0 | _mutex.unlock(); |
217 | 0 | } |
218 | | |
219 | | } // namespace detail |
220 | | } // namespace bvar |