/src/brpc/src/bthread/butex.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // bthread - An M:N threading library to make applications more concurrent. |
19 | | |
20 | | // Date: Tue Jul 22 17:30:12 CST 2014 |
21 | | |
22 | | #include "butil/atomicops.h" // butil::atomic |
23 | | #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK |
24 | | #include "butil/macros.h" |
25 | | #include "butil/containers/flat_map.h" |
26 | | #include "butil/containers/linked_list.h" // LinkNode |
27 | | #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
28 | | #include "butil/memory/singleton_on_pthread_once.h" |
29 | | #endif |
30 | | #include "butil/logging.h" |
31 | | #include "butil/object_pool.h" |
32 | | #include "bthread/errno.h" // EWOULDBLOCK |
33 | | #include "bthread/sys_futex.h" // futex_* |
34 | | #include "bthread/processor.h" // cpu_relax |
35 | | #include "bthread/task_control.h" // TaskControl |
36 | | #include "bthread/task_group.h" // TaskGroup |
37 | | #include "bthread/timer_thread.h" |
38 | | #include "bthread/butex.h" |
39 | | #include "bthread/mutex.h" |
40 | | |
41 | | // This file implements butex.h |
42 | | // Provides futex-like semantics which is sequenced wait and wake operations |
43 | | // and guaranteed visibilities. |
44 | | // |
45 | | // If wait is sequenced before wake: |
46 | | // [thread1] [thread2] |
47 | | // wait() value = new_value |
48 | | // wake() |
49 | | // wait() sees unmatched value(fail to wait), or wake() sees the waiter. |
50 | | // |
51 | | // If wait is sequenced after wake: |
52 | | // [thread1] [thread2] |
53 | | // value = new_value |
54 | | // wake() |
55 | | // wait() |
56 | | // wake() must provide some sort of memory fence to prevent assignment |
57 | | // of value to be reordered after it. Thus the value is visible to wait() |
58 | | // as well. |
59 | | |
60 | | namespace bthread { |
61 | | |
62 | | #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
63 | | struct ButexWaiterCount : public bvar::Adder<int64_t> { |
64 | | ButexWaiterCount() : bvar::Adder<int64_t>("bthread_butex_waiter_count") {} |
65 | | }; |
66 | | inline bvar::Adder<int64_t>& butex_waiter_count() { |
67 | | return *butil::get_leaky_singleton<ButexWaiterCount>(); |
68 | | } |
69 | | #endif |
70 | | |
71 | | enum WaiterState { |
72 | | WAITER_STATE_NONE, |
73 | | WAITER_STATE_READY, |
74 | | WAITER_STATE_TIMEDOUT, |
75 | | WAITER_STATE_UNMATCHEDVALUE, |
76 | | WAITER_STATE_INTERRUPTED, |
77 | | }; |
78 | | |
79 | | struct Butex; |
80 | | |
81 | | struct ButexWaiter : public butil::LinkNode<ButexWaiter> { |
82 | | // tids of pthreads are 0 |
83 | | bthread_t tid; |
84 | | |
85 | | // Erasing node from middle of LinkedList is thread-unsafe, we need |
86 | | // to hold its container's lock. |
87 | | butil::atomic<Butex*> container; |
88 | | }; |
89 | | |
90 | | // non_pthread_task allocates this structure on stack and queue it in |
91 | | // Butex::waiters. |
92 | | struct ButexBthreadWaiter : public ButexWaiter { |
93 | | TaskMeta* task_meta; |
94 | | TimerThread::TaskId sleep_id; |
95 | | WaiterState waiter_state; |
96 | | int expected_value; |
97 | | Butex* initial_butex; |
98 | | TaskControl* control; |
99 | | const timespec* abstime; |
100 | | bthread_tag_t tag; |
101 | | }; |
102 | | |
103 | | // pthread_task or main_task allocates this structure on stack and queue it |
104 | | // in Butex::waiters. |
105 | | struct ButexPthreadWaiter : public ButexWaiter { |
106 | | butil::atomic<int> sig; |
107 | | }; |
108 | | |
109 | | typedef butil::LinkedList<ButexWaiter> ButexWaiterList; |
110 | | |
111 | | enum ButexPthreadSignal { PTHREAD_NOT_SIGNALLED, PTHREAD_SIGNALLED }; |
112 | | |
113 | | struct BAIDU_CACHELINE_ALIGNMENT Butex { |
114 | 0 | Butex() {} |
115 | 0 | ~Butex() {} |
116 | | |
117 | | butil::atomic<int> value; |
118 | | ButexWaiterList waiters; |
119 | | FastPthreadMutex waiter_lock; |
120 | | }; |
121 | | |
122 | | BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0); |
123 | | BAIDU_CASSERT(sizeof(Butex) == BAIDU_CACHELINE_SIZE, butex_fits_in_one_cacheline); |
124 | | |
125 | | } // namespace bthread |
126 | | |
127 | | namespace butil { |
128 | | // Butex object returned to the ObjectPool<Butex> may be accessed, |
129 | | // so ObjectPool<Butex> can not poison the memory region of Butex. |
130 | | template <> |
131 | | struct ObjectPoolWithASanPoison<bthread::Butex> : false_type {}; |
132 | | } // namespace butil |
133 | | |
134 | | namespace bthread { |
135 | | |
136 | 0 | static void wakeup_pthread(ButexPthreadWaiter* pw) { |
137 | | // release fence makes wait_pthread see changes before wakeup. |
138 | 0 | pw->sig.store(PTHREAD_SIGNALLED, butil::memory_order_release); |
139 | | // At this point, wait_pthread() possibly has woken up and destroyed `pw'. |
140 | | // In which case, futex_wake_private() should return EFAULT. |
141 | | // If crash happens in future, `pw' can be made TLS and never destroyed |
142 | | // to solve the issue. |
143 | 0 | futex_wake_private(&pw->sig, 1); |
144 | 0 | } |
145 | | |
146 | | bool erase_from_butex(ButexWaiter*, bool, WaiterState); |
147 | | |
148 | 0 | int wait_pthread(ButexPthreadWaiter& pw, const timespec* abstime) { |
149 | 0 | timespec* ptimeout = NULL; |
150 | 0 | timespec timeout; |
151 | 0 | int64_t timeout_us = 0; |
152 | 0 | int rc; |
153 | |
|
154 | 0 | while (true) { |
155 | 0 | if (abstime != NULL) { |
156 | 0 | timeout_us = butil::timespec_to_microseconds(*abstime) - butil::gettimeofday_us(); |
157 | 0 | timeout = butil::microseconds_to_timespec(timeout_us); |
158 | 0 | ptimeout = &timeout; |
159 | 0 | } |
160 | 0 | if (timeout_us > MIN_SLEEP_US || abstime == NULL) { |
161 | 0 | rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout); |
162 | 0 | if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) { |
163 | | // If `sig' is changed, wakeup_pthread() must be called and `pw' |
164 | | // is already removed from the butex. |
165 | | // Acquire fence makes this thread sees changes before wakeup. |
166 | 0 | return rc; |
167 | 0 | } |
168 | 0 | } else { |
169 | 0 | errno = ETIMEDOUT; |
170 | 0 | rc = -1; |
171 | 0 | } |
172 | | // Handle ETIMEDOUT when abstime is valid. |
173 | | // If futex_wait_private return EINTR, just continue the loop. |
174 | 0 | if (rc != 0 && errno == ETIMEDOUT) { |
175 | | // wait futex timeout, `pw' is still in the queue, remove it. |
176 | 0 | if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) { |
177 | | // Another thread is erasing `pw' as well, wait for the signal. |
178 | | // Acquire fence makes this thread sees changes before wakeup. |
179 | 0 | if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) { |
180 | | // already timedout, abstime and ptimeout are expired. |
181 | 0 | abstime = NULL; |
182 | 0 | ptimeout = NULL; |
183 | 0 | continue; |
184 | 0 | } |
185 | 0 | } |
186 | 0 | return rc; |
187 | 0 | } |
188 | 0 | } |
189 | 0 | } |
190 | | |
191 | | extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; |
192 | | |
193 | | // Returns 0 when no need to unschedule or successfully unscheduled, |
194 | | // -1 otherwise. |
195 | | inline int unsleep_if_necessary(ButexBthreadWaiter* w, |
196 | 0 | TimerThread* timer_thread) { |
197 | 0 | if (!w->sleep_id) { |
198 | 0 | return 0; |
199 | 0 | } |
200 | 0 | if (timer_thread->unschedule(w->sleep_id) > 0) { |
201 | | // the callback is running. |
202 | 0 | return -1; |
203 | 0 | } |
204 | 0 | w->sleep_id = 0; |
205 | 0 | return 0; |
206 | 0 | } |
207 | | |
208 | | // Use ObjectPool(which never frees memory) to solve the race between |
209 | | // butex_wake() and butex_destroy(). The race is as follows: |
210 | | // |
211 | | // class Event { |
212 | | // public: |
213 | | // void wait() { |
214 | | // _mutex.lock(); |
215 | | // if (!_done) { |
216 | | // _cond.wait(&_mutex); |
217 | | // } |
218 | | // _mutex.unlock(); |
219 | | // } |
220 | | // void signal() { |
221 | | // _mutex.lock(); |
222 | | // if (!_done) { |
223 | | // _done = true; |
224 | | // _cond.signal(); |
225 | | // } |
226 | | // _mutex.unlock(); /*1*/ |
227 | | // } |
228 | | // private: |
229 | | // bool _done = false; |
230 | | // Mutex _mutex; |
231 | | // Condition _cond; |
232 | | // }; |
233 | | // |
234 | | // [Thread1] [Thread2] |
235 | | // foo() { |
236 | | // Event event; |
237 | | // pass_to_thread2(&event); ---> event.signal(); |
238 | | // event.wait(); |
239 | | // } <-- event destroyed |
240 | | // |
241 | | // Summary: Thread1 passes a stateful condition to Thread2 and waits until |
242 | | // the condition being signalled, which basically means the associated |
243 | | // job is done and Thread1 can release related resources including the mutex |
244 | | // and condition. The scenario is fine and the code is correct. |
245 | | // The race needs a closer look. The unlock at /*1*/ may have different |
246 | | // implementations, but in which the last step is probably an atomic store |
247 | | // and butex_wake(), like this: |
248 | | // |
249 | | // locked->store(0); |
250 | | // butex_wake(locked); |
251 | | // |
252 | | // The `locked' represents the locking status of the mutex. The issue is that |
253 | | // just after the store(), the mutex is already unlocked and the code in |
254 | | // Event.wait() may successfully grab the lock and go through everything |
255 | | // left and leave foo() function, destroying the mutex and butex, making |
256 | | // the butex_wake(locked) crash. |
257 | | // To solve this issue, one method is to add reference before store and |
258 | | // release the reference after butex_wake. However reference countings need |
259 | | // to be added in nearly every user scenario of butex_wake(), which is very |
260 | | // error-prone. Another method is never freeing butex, with the side effect |
261 | | // that butex_wake() may wake up an unrelated butex(the one reuses the memory) |
262 | | // and cause spurious wakeups. According to our observations, the race is |
263 | | // infrequent, even rare. The extra spurious wakeups should be acceptable. |
264 | | |
265 | 0 | void* butex_create() { |
266 | 0 | Butex* b = butil::get_object<Butex>(); |
267 | 0 | if (b) { |
268 | 0 | return &b->value; |
269 | 0 | } |
270 | 0 | return NULL; |
271 | 0 | } |
272 | | |
273 | 0 | void butex_destroy(void* butex) { |
274 | 0 | if (!butex) { |
275 | 0 | return; |
276 | 0 | } |
277 | 0 | Butex* b = static_cast<Butex*>( |
278 | 0 | container_of(static_cast<butil::atomic<int>*>(butex), Butex, value)); |
279 | 0 | butil::return_object(b); |
280 | 0 | } |
281 | | |
282 | | // if TaskGroup tls_task_group is belong to tag |
283 | 0 | inline bool is_same_tag(bthread_tag_t tag) { |
284 | 0 | return tls_task_group && tls_task_group->tag() == tag; |
285 | 0 | } |
286 | | |
287 | | // nosignal is true & tag is same can return true |
288 | 0 | inline bool check_nosignal(bool nosignal, bthread_tag_t tag) { |
289 | 0 | return nosignal && is_same_tag(tag); |
290 | 0 | } |
291 | | |
292 | | // if tag is same return tls_task_group else choose one group with tag |
293 | 0 | inline TaskGroup* get_task_group(TaskControl* c, bthread_tag_t tag) { |
294 | 0 | return is_same_tag(tag) ? tls_task_group : c->choose_one_group(tag); |
295 | 0 | } |
296 | | |
297 | 0 | inline void run_in_local_task_group(TaskGroup* g, TaskMeta* next_meta, bool nosignal) { |
298 | 0 | if (!nosignal) { |
299 | 0 | TaskGroup::exchange(&g, next_meta); |
300 | 0 | } else { |
301 | 0 | g->ready_to_run(next_meta, nosignal); |
302 | 0 | } |
303 | 0 | } |
304 | | |
305 | 0 | int butex_wake(void* arg, bool nosignal) { |
306 | 0 | Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); |
307 | 0 | ButexWaiter* front = NULL; |
308 | 0 | { |
309 | 0 | BAIDU_SCOPED_LOCK(b->waiter_lock); |
310 | 0 | if (b->waiters.empty()) { |
311 | 0 | return 0; |
312 | 0 | } |
313 | 0 | front = b->waiters.head()->value(); |
314 | 0 | front->RemoveFromList(); |
315 | 0 | front->container.store(NULL, butil::memory_order_relaxed); |
316 | 0 | } |
317 | 0 | if (front->tid == 0) { |
318 | 0 | wakeup_pthread(static_cast<ButexPthreadWaiter*>(front)); |
319 | 0 | return 1; |
320 | 0 | } |
321 | 0 | ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front); |
322 | 0 | unsleep_if_necessary(bbw, get_global_timer_thread()); |
323 | 0 | TaskGroup* g = get_task_group(bbw->control, bbw->tag); |
324 | 0 | if (g == tls_task_group) { |
325 | 0 | run_in_local_task_group(g, bbw->task_meta, nosignal); |
326 | 0 | } else { |
327 | 0 | g->ready_to_run_remote(bbw->task_meta, check_nosignal(nosignal, g->tag())); |
328 | 0 | } |
329 | 0 | return 1; |
330 | 0 | } |
331 | | |
332 | 0 | int butex_wake_n(void* arg, size_t n, bool nosignal) { |
333 | 0 | Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); |
334 | |
|
335 | 0 | ButexWaiterList bthread_waiters; |
336 | 0 | ButexWaiterList pthread_waiters; |
337 | 0 | { |
338 | 0 | BAIDU_SCOPED_LOCK(b->waiter_lock); |
339 | 0 | for (size_t i = 0; (n == 0 || i < n) && !b->waiters.empty(); ++i) { |
340 | 0 | ButexWaiter* bw = b->waiters.head()->value(); |
341 | 0 | bw->RemoveFromList(); |
342 | 0 | bw->container.store(NULL, butil::memory_order_relaxed); |
343 | 0 | if (bw->tid) { |
344 | 0 | bthread_waiters.Append(bw); |
345 | 0 | } else { |
346 | 0 | pthread_waiters.Append(bw); |
347 | 0 | } |
348 | 0 | } |
349 | 0 | } |
350 | |
|
351 | 0 | int nwakeup = 0; |
352 | 0 | while (!pthread_waiters.empty()) { |
353 | 0 | ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>( |
354 | 0 | pthread_waiters.head()->value()); |
355 | 0 | bw->RemoveFromList(); |
356 | 0 | wakeup_pthread(bw); |
357 | 0 | ++nwakeup; |
358 | 0 | } |
359 | 0 | if (bthread_waiters.empty()) { |
360 | 0 | return nwakeup; |
361 | 0 | } |
362 | 0 | butil::FlatMap<bthread_tag_t, TaskGroup*> nwakeups; |
363 | 0 | nwakeups.init(FLAGS_task_group_ntags); |
364 | | // We will exchange with first waiter in the end. |
365 | 0 | ButexBthreadWaiter* next = static_cast<ButexBthreadWaiter*>( |
366 | 0 | bthread_waiters.head()->value()); |
367 | 0 | next->RemoveFromList(); |
368 | 0 | unsleep_if_necessary(next, get_global_timer_thread()); |
369 | 0 | ++nwakeup; |
370 | 0 | while (!bthread_waiters.empty()) { |
371 | | // pop reversely |
372 | 0 | ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>( |
373 | 0 | bthread_waiters.tail()->value()); |
374 | 0 | w->RemoveFromList(); |
375 | 0 | unsleep_if_necessary(w, get_global_timer_thread()); |
376 | 0 | auto g = get_task_group(w->control, w->tag); |
377 | 0 | g->ready_to_run_general(w->task_meta, true); |
378 | 0 | nwakeups[g->tag()] = g; |
379 | 0 | ++nwakeup; |
380 | 0 | } |
381 | 0 | for (auto it = nwakeups.begin(); it != nwakeups.end(); ++it) { |
382 | 0 | auto g = it->second; |
383 | 0 | if (!check_nosignal(nosignal, g->tag())) { |
384 | 0 | g->flush_nosignal_tasks_general(); |
385 | 0 | } |
386 | 0 | } |
387 | 0 | auto g = get_task_group(next->control, next->tag); |
388 | 0 | if (g == tls_task_group) { |
389 | 0 | run_in_local_task_group(g, next->task_meta, nosignal); |
390 | 0 | } else { |
391 | 0 | g->ready_to_run_remote(next->task_meta, check_nosignal(nosignal, g->tag())); |
392 | 0 | } |
393 | 0 | return nwakeup; |
394 | 0 | } |
395 | | |
396 | 0 | int butex_wake_all(void* arg, bool nosignal) { |
397 | 0 | return butex_wake_n(arg, 0, nosignal); |
398 | 0 | } |
399 | | |
400 | 0 | int butex_wake_except(void* arg, bthread_t excluded_bthread) { |
401 | 0 | Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); |
402 | |
|
403 | 0 | ButexWaiterList bthread_waiters; |
404 | 0 | ButexWaiterList pthread_waiters; |
405 | 0 | { |
406 | 0 | ButexWaiter* excluded_waiter = NULL; |
407 | 0 | BAIDU_SCOPED_LOCK(b->waiter_lock); |
408 | 0 | while (!b->waiters.empty()) { |
409 | 0 | ButexWaiter* bw = b->waiters.head()->value(); |
410 | 0 | bw->RemoveFromList(); |
411 | |
|
412 | 0 | if (bw->tid) { |
413 | 0 | if (bw->tid != excluded_bthread) { |
414 | 0 | bthread_waiters.Append(bw); |
415 | 0 | bw->container.store(NULL, butil::memory_order_relaxed); |
416 | 0 | } else { |
417 | 0 | excluded_waiter = bw; |
418 | 0 | } |
419 | 0 | } else { |
420 | 0 | bw->container.store(NULL, butil::memory_order_relaxed); |
421 | 0 | pthread_waiters.Append(bw); |
422 | 0 | } |
423 | 0 | } |
424 | |
|
425 | 0 | if (excluded_waiter) { |
426 | 0 | b->waiters.Append(excluded_waiter); |
427 | 0 | } |
428 | 0 | } |
429 | |
|
430 | 0 | int nwakeup = 0; |
431 | 0 | while (!pthread_waiters.empty()) { |
432 | 0 | ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>( |
433 | 0 | pthread_waiters.head()->value()); |
434 | 0 | bw->RemoveFromList(); |
435 | 0 | wakeup_pthread(bw); |
436 | 0 | ++nwakeup; |
437 | 0 | } |
438 | |
|
439 | 0 | if (bthread_waiters.empty()) { |
440 | 0 | return nwakeup; |
441 | 0 | } |
442 | 0 | butil::FlatMap<bthread_tag_t, TaskGroup*> nwakeups; |
443 | 0 | nwakeups.init(FLAGS_task_group_ntags); |
444 | 0 | do { |
445 | | // pop reversely |
446 | 0 | ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(bthread_waiters.tail()->value()); |
447 | 0 | w->RemoveFromList(); |
448 | 0 | unsleep_if_necessary(w, get_global_timer_thread()); |
449 | 0 | auto g = get_task_group(w->control, w->tag); |
450 | 0 | g->ready_to_run_general(w->task_meta, true); |
451 | 0 | nwakeups[g->tag()] = g; |
452 | 0 | ++nwakeup; |
453 | 0 | } while (!bthread_waiters.empty()); |
454 | 0 | for (auto it = nwakeups.begin(); it != nwakeups.end(); ++it) { |
455 | 0 | auto g = it->second; |
456 | 0 | g->flush_nosignal_tasks_general(); |
457 | 0 | } |
458 | 0 | return nwakeup; |
459 | 0 | } |
460 | | |
461 | 0 | int butex_requeue(void* arg, void* arg2) { |
462 | 0 | Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); |
463 | 0 | Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value); |
464 | |
|
465 | 0 | ButexWaiter* front = NULL; |
466 | 0 | { |
467 | 0 | std::unique_lock<FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock); |
468 | 0 | std::unique_lock<FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock); |
469 | 0 | butil::double_lock(lck1, lck2); |
470 | 0 | if (b->waiters.empty()) { |
471 | 0 | return 0; |
472 | 0 | } |
473 | | |
474 | 0 | front = b->waiters.head()->value(); |
475 | 0 | front->RemoveFromList(); |
476 | 0 | front->container.store(NULL, butil::memory_order_relaxed); |
477 | |
|
478 | 0 | while (!b->waiters.empty()) { |
479 | 0 | ButexWaiter* bw = b->waiters.head()->value(); |
480 | 0 | bw->RemoveFromList(); |
481 | 0 | m->waiters.Append(bw); |
482 | 0 | bw->container.store(m, butil::memory_order_relaxed); |
483 | 0 | } |
484 | 0 | } |
485 | | |
486 | 0 | if (front->tid == 0) { // which is a pthread |
487 | 0 | wakeup_pthread(static_cast<ButexPthreadWaiter*>(front)); |
488 | 0 | return 1; |
489 | 0 | } |
490 | 0 | ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front); |
491 | 0 | unsleep_if_necessary(bbw, get_global_timer_thread()); |
492 | 0 | auto g = is_same_tag(bbw->tag) ? tls_task_group : NULL; |
493 | 0 | if (g) { |
494 | 0 | TaskGroup::exchange(&g, bbw->task_meta); |
495 | 0 | } else { |
496 | 0 | bbw->control->choose_one_group(bbw->tag)->ready_to_run_remote(bbw->task_meta); |
497 | 0 | } |
498 | 0 | return 1; |
499 | 0 | } |
500 | | |
501 | | // Callable from multiple threads, at most one thread may wake up the waiter. |
502 | 0 | static void erase_from_butex_and_wakeup(void* arg) { |
503 | 0 | erase_from_butex(static_cast<ButexWaiter*>(arg), true, WAITER_STATE_TIMEDOUT); |
504 | 0 | } |
505 | | |
506 | | // Used in task_group.cpp |
507 | 0 | bool erase_from_butex_because_of_interruption(ButexWaiter* bw) { |
508 | 0 | return erase_from_butex(bw, true, WAITER_STATE_INTERRUPTED); |
509 | 0 | } |
510 | | |
511 | 0 | inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) { |
512 | | // `bw' is guaranteed to be valid inside this function because waiter |
513 | | // will wait until this function being cancelled or finished. |
514 | | // NOTE: This function must be no-op when bw->container is NULL. |
515 | 0 | bool erased = false; |
516 | 0 | Butex* b; |
517 | 0 | int saved_errno = errno; |
518 | 0 | while ((b = bw->container.load(butil::memory_order_acquire))) { |
519 | | // b can be NULL when the waiter is scheduled but queued. |
520 | 0 | BAIDU_SCOPED_LOCK(b->waiter_lock); |
521 | 0 | if (b == bw->container.load(butil::memory_order_relaxed)) { |
522 | 0 | bw->RemoveFromList(); |
523 | 0 | bw->container.store(NULL, butil::memory_order_relaxed); |
524 | 0 | if (bw->tid) { |
525 | 0 | static_cast<ButexBthreadWaiter*>(bw)->waiter_state = state; |
526 | 0 | } |
527 | 0 | erased = true; |
528 | 0 | break; |
529 | 0 | } |
530 | 0 | } |
531 | 0 | if (erased && wakeup) { |
532 | 0 | if (bw->tid) { |
533 | 0 | ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw); |
534 | 0 | get_task_group(bbw->control, bbw->tag)->ready_to_run_general(bbw->task_meta); |
535 | 0 | } else { |
536 | 0 | ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw); |
537 | 0 | wakeup_pthread(pw); |
538 | 0 | } |
539 | 0 | } |
540 | 0 | errno = saved_errno; |
541 | 0 | return erased; |
542 | 0 | } |
543 | | |
544 | | struct WaitForButexArgs { |
545 | | ButexBthreadWaiter* bw; |
546 | | bool prepend; |
547 | | }; |
548 | | |
549 | 0 | void wait_for_butex(void* arg) { |
550 | 0 | auto args = static_cast<WaitForButexArgs*>(arg); |
551 | 0 | ButexBthreadWaiter* const bw = args->bw; |
552 | 0 | Butex* const b = bw->initial_butex; |
553 | | // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY |
554 | | // before they're queued, otherwise the waiter is already timedout |
555 | | // and removed by TimerThread, in which case we should stop queueing. |
556 | | // |
557 | | // Visibility of waiter_state: |
558 | | // [bthread] [TimerThread] |
559 | | // waiter_state = TIMED |
560 | | // tt_lock { add task } |
561 | | // tt_lock { get task } |
562 | | // waiter_lock { waiter_state=TIMEDOUT } |
563 | | // waiter_lock { use waiter_state } |
564 | | // tt_lock represents TimerThread::_mutex. Visibility of waiter_state is |
565 | | // sequenced by two locks, both threads are guaranteed to see the correct |
566 | | // value. |
567 | 0 | { |
568 | 0 | BAIDU_SCOPED_LOCK(b->waiter_lock); |
569 | 0 | if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) { |
570 | 0 | bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE; |
571 | 0 | } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ && |
572 | 0 | !bw->task_meta->interrupted) { |
573 | 0 | if (args->prepend) { |
574 | 0 | b->waiters.Prepend(bw); |
575 | 0 | } else { |
576 | 0 | b->waiters.Append(bw); |
577 | 0 | } |
578 | 0 | bw->container.store(b, butil::memory_order_relaxed); |
579 | | #ifdef BRPC_BTHREAD_TRACER |
580 | | bw->control->_task_tracer.set_status(TASK_STATUS_SUSPENDED, bw->task_meta); |
581 | | #endif // BRPC_BTHREAD_TRACER |
582 | 0 | if (bw->abstime != NULL) { |
583 | 0 | bw->sleep_id = get_global_timer_thread()->schedule( |
584 | 0 | erase_from_butex_and_wakeup, bw, *bw->abstime); |
585 | 0 | if (!bw->sleep_id) { // TimerThread stopped. |
586 | 0 | errno = ESTOP; |
587 | 0 | erase_from_butex_and_wakeup(bw); |
588 | 0 | } |
589 | 0 | } |
590 | 0 | return; |
591 | 0 | } |
592 | 0 | } |
593 | | |
594 | | // b->container is NULL which makes erase_from_butex_and_wakeup() and |
595 | | // TaskGroup::interrupt() no-op, there's no race between following code and |
596 | | // the two functions. The on-stack ButexBthreadWaiter is safe to use and |
597 | | // bw->waiter_state will not change again. |
598 | | // unsleep_if_necessary(bw, get_global_timer_thread()); |
599 | 0 | tls_task_group->ready_to_run(bw->task_meta); |
600 | | // FIXME: jump back to original thread is buggy. |
601 | | |
602 | | // // Value unmatched or waiter is already woken up by TimerThread, jump |
603 | | // // back to original bthread. |
604 | | // TaskGroup* g = tls_task_group; |
605 | | // ReadyToRunArgs args = { g->current_tid(), false }; |
606 | | // g->set_remained(TaskGroup::ready_to_run_in_worker, &args); |
607 | | // // 2: Don't run remained because we're already in a remained function |
608 | | // // otherwise stack may overflow. |
609 | | // TaskGroup::sched_to(&g, bw->tid, false/*2*/); |
610 | 0 | } |
611 | | |
612 | | static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value, |
613 | 0 | const timespec* abstime, bool prepend) { |
614 | 0 | TaskMeta* task = NULL; |
615 | 0 | ButexPthreadWaiter pw; |
616 | 0 | pw.tid = 0; |
617 | 0 | pw.sig.store(PTHREAD_NOT_SIGNALLED, butil::memory_order_relaxed); |
618 | 0 | int rc = 0; |
619 | | |
620 | 0 | if (g) { |
621 | 0 | task = g->current_task(); |
622 | 0 | task->current_waiter.store(&pw, butil::memory_order_release); |
623 | 0 | } |
624 | 0 | b->waiter_lock.lock(); |
625 | 0 | if (b->value.load(butil::memory_order_relaxed) != expected_value) { |
626 | 0 | b->waiter_lock.unlock(); |
627 | 0 | errno = EWOULDBLOCK; |
628 | 0 | rc = -1; |
629 | 0 | } else if (task != NULL && task->interrupted) { |
630 | 0 | b->waiter_lock.unlock(); |
631 | | // Race with set and may consume multiple interruptions, which are OK. |
632 | 0 | task->interrupted = false; |
633 | 0 | errno = EINTR; |
634 | 0 | rc = -1; |
635 | 0 | } else { |
636 | 0 | if (prepend) { |
637 | 0 | b->waiters.Prepend(&pw); |
638 | 0 | } else { |
639 | 0 | b->waiters.Append(&pw); |
640 | 0 | } |
641 | 0 | pw.container.store(b, butil::memory_order_relaxed); |
642 | 0 | b->waiter_lock.unlock(); |
643 | |
|
644 | | #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
645 | | bvar::Adder<int64_t>& num_waiters = butex_waiter_count(); |
646 | | num_waiters << 1; |
647 | | #endif |
648 | 0 | rc = wait_pthread(pw, abstime); |
649 | | #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
650 | | num_waiters << -1; |
651 | | #endif |
652 | 0 | } |
653 | 0 | if (task) { |
654 | | // If current_waiter is NULL, TaskGroup::interrupt() is running and |
655 | | // using pw, spin until current_waiter != NULL. |
656 | 0 | BT_LOOP_WHEN(task->current_waiter.exchange( |
657 | 0 | NULL, butil::memory_order_acquire) == NULL, |
658 | 0 | 30/*nops before sched_yield*/); |
659 | 0 | if (task->interrupted) { |
660 | 0 | task->interrupted = false; |
661 | 0 | if (rc == 0) { |
662 | 0 | errno = EINTR; |
663 | 0 | return -1; |
664 | 0 | } |
665 | 0 | } |
666 | 0 | } |
667 | 0 | return rc; |
668 | 0 | } |
669 | | |
670 | 0 | int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prepend) { |
671 | 0 | Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); |
672 | 0 | if (b->value.load(butil::memory_order_relaxed) != expected_value) { |
673 | 0 | errno = EWOULDBLOCK; |
674 | | // Sometimes we may take actions immediately after unmatched butex, |
675 | | // this fence makes sure that we see changes before changing butex. |
676 | 0 | butil::atomic_thread_fence(butil::memory_order_acquire); |
677 | 0 | return -1; |
678 | 0 | } |
679 | 0 | TaskGroup* g = tls_task_group; |
680 | 0 | if (NULL == g || g->is_current_pthread_task()) { |
681 | 0 | return butex_wait_from_pthread(g, b, expected_value, abstime, prepend); |
682 | 0 | } |
683 | 0 | ButexBthreadWaiter bbw; |
684 | | // tid is 0 iff the thread is non-bthread |
685 | 0 | bbw.tid = g->current_tid(); |
686 | 0 | bbw.container.store(NULL, butil::memory_order_relaxed); |
687 | 0 | bbw.task_meta = g->current_task(); |
688 | 0 | bbw.sleep_id = 0; |
689 | 0 | bbw.waiter_state = WAITER_STATE_READY; |
690 | 0 | bbw.expected_value = expected_value; |
691 | 0 | bbw.initial_butex = b; |
692 | 0 | bbw.control = g->control(); |
693 | 0 | bbw.abstime = abstime; |
694 | 0 | bbw.tag = g->tag(); |
695 | |
|
696 | 0 | if (abstime != NULL) { |
697 | | // Schedule timer before queueing. If the timer is triggered before |
698 | | // queueing, cancel queueing. This is a kind of optimistic locking. |
699 | 0 | if (butil::timespec_to_microseconds(*abstime) < |
700 | 0 | (butil::gettimeofday_us() + MIN_SLEEP_US)) { |
701 | | // Already timed out. |
702 | 0 | errno = ETIMEDOUT; |
703 | 0 | return -1; |
704 | 0 | } |
705 | 0 | } |
706 | | #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
707 | | bvar::Adder<int64_t>& num_waiters = butex_waiter_count(); |
708 | | num_waiters << 1; |
709 | | #endif |
710 | | |
711 | | // release fence matches with acquire fence in interrupt_and_consume_waiters |
712 | | // in task_group.cpp to guarantee visibility of `interrupted'. |
713 | 0 | bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release); |
714 | 0 | WaitForButexArgs args{ &bbw, prepend }; |
715 | 0 | g->set_remained(wait_for_butex, &args); |
716 | 0 | TaskGroup::sched(&g); |
717 | | |
718 | | // erase_from_butex_and_wakeup (called by TimerThread) is possibly still |
719 | | // running and using bbw. The chance is small, just spin until it's done. |
720 | 0 | BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0, |
721 | 0 | 30/*nops before sched_yield*/); |
722 | | |
723 | | // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw. |
724 | | // Spin until current_waiter != NULL. |
725 | 0 | BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange( |
726 | 0 | NULL, butil::memory_order_acquire) == NULL, |
727 | 0 | 30/*nops before sched_yield*/); |
728 | | #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
729 | | num_waiters << -1; |
730 | | #endif |
731 | |
|
732 | 0 | bool is_interrupted = false; |
733 | 0 | if (bbw.task_meta->interrupted) { |
734 | | // Race with set and may consume multiple interruptions, which are OK. |
735 | 0 | bbw.task_meta->interrupted = false; |
736 | 0 | is_interrupted = true; |
737 | 0 | } |
738 | | // If timed out as well as value unmatched, return ETIMEDOUT. |
739 | 0 | if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) { |
740 | 0 | errno = ETIMEDOUT; |
741 | 0 | return -1; |
742 | 0 | } else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) { |
743 | 0 | errno = EWOULDBLOCK; |
744 | 0 | return -1; |
745 | 0 | } else if (is_interrupted) { |
746 | 0 | errno = EINTR; |
747 | 0 | return -1; |
748 | 0 | } |
749 | 0 | return 0; |
750 | 0 | } |
751 | | |
752 | | } // namespace bthread |
753 | | |
754 | | namespace butil { |
755 | | template <> struct ObjectPoolBlockMaxItem<bthread::Butex> { |
756 | | static const size_t value = 128; |
757 | | }; |
758 | | } |