Coverage Report

Created: 2025-12-02 06:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/work_stealing_queue.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Tue Jul 10 17:40:58 CST 2012
21
22
#ifndef BTHREAD_WORK_STEALING_QUEUE_H
23
#define BTHREAD_WORK_STEALING_QUEUE_H
24
25
#include "butil/macros.h"
26
#include "butil/atomicops.h"
27
#include "butil/logging.h"
28
29
namespace bthread {
30
31
template <typename T>
32
class WorkStealingQueue {
33
public:
34
    WorkStealingQueue()
35
0
        : _bottom(1)
36
0
        , _capacity(0)
37
0
        , _buffer(NULL)
38
0
        , _top(1) {
39
0
    }
40
41
0
    ~WorkStealingQueue() {
42
0
        delete [] _buffer;
43
0
        _buffer = NULL;
44
0
    }
45
46
0
    int init(size_t capacity) {
47
0
        if (_capacity != 0) {
48
0
            LOG(ERROR) << "Already initialized";
49
0
            return -1;
50
0
        }
51
0
        if (capacity == 0) {
52
0
            LOG(ERROR) << "Invalid capacity=" << capacity;
53
0
            return -1;
54
0
        }
55
0
        if (capacity & (capacity - 1)) {
56
0
            LOG(ERROR) << "Invalid capacity=" << capacity
57
0
                       << " which must be power of 2";
58
0
            return -1;
59
0
        }
60
0
        _buffer = new(std::nothrow) T[capacity];
61
0
        if (NULL == _buffer) {
62
0
            return -1;
63
0
        }
64
0
        _capacity = capacity;
65
0
        return 0;
66
0
    }
67
68
    // Push an item into the queue.
69
    // Returns true on pushed.
70
    // May run in parallel with steal().
71
    // Never run in parallel with pop() or another push().
72
0
    bool push(const T& x) {
73
0
        const size_t b = _bottom.load(butil::memory_order_relaxed);
74
0
        const size_t t = _top.load(butil::memory_order_acquire);
75
0
        if (b >= t + _capacity) { // Full queue.
76
0
            return false;
77
0
        }
78
0
        _buffer[b & (_capacity - 1)] = x;
79
0
        _bottom.store(b + 1, butil::memory_order_release);
80
0
        return true;
81
0
    }
82
83
    // Pop an item from the queue.
84
    // Returns true on popped and the item is written to `val'.
85
    // May run in parallel with steal().
86
    // Never run in parallel with push() or another pop().
87
0
    bool pop(T* val) {
88
0
        const size_t b = _bottom.load(butil::memory_order_relaxed);
89
0
        size_t t = _top.load(butil::memory_order_relaxed);
90
0
        if (t >= b) {
91
            // fast check since we call pop() in each sched.
92
            // Stale _top which is smaller should not enter this branch.
93
0
            return false;
94
0
        }
95
0
        const size_t newb = b - 1;
96
0
        _bottom.store(newb, butil::memory_order_relaxed);
97
0
        butil::atomic_thread_fence(butil::memory_order_seq_cst);
98
0
        t = _top.load(butil::memory_order_relaxed);
99
0
        if (t > newb) {
100
0
            _bottom.store(b, butil::memory_order_relaxed);
101
0
            return false;
102
0
        }
103
0
        *val = _buffer[newb & (_capacity - 1)];
104
0
        if (t != newb) {
105
0
            return true;
106
0
        }
107
        // Single last element, compete with steal()
108
0
        const bool popped = _top.compare_exchange_strong(
109
0
            t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);
110
0
        _bottom.store(b, butil::memory_order_relaxed);
111
0
        return popped;
112
0
    }
113
114
    // Steal one item from the queue.
115
    // Returns true on stolen.
116
    // May run in parallel with push() pop() or another steal().
117
0
    bool steal(T* val) {
118
0
        size_t t = _top.load(butil::memory_order_acquire);
119
0
        size_t b = _bottom.load(butil::memory_order_acquire);
120
0
        if (t >= b) {
121
            // Permit false negative for performance considerations.
122
0
            return false;
123
0
        }
124
0
        do {
125
0
            butil::atomic_thread_fence(butil::memory_order_seq_cst);
126
0
            b = _bottom.load(butil::memory_order_acquire);
127
0
            if (t >= b) {
128
0
                return false;
129
0
            }
130
0
            *val = _buffer[t & (_capacity - 1)];
131
0
        } while (!_top.compare_exchange_strong(t, t + 1,
132
0
                                               butil::memory_order_seq_cst,
133
0
                                               butil::memory_order_relaxed));
134
0
        return true;
135
0
    }
136
137
0
    size_t volatile_size() const {
138
0
        const size_t b = _bottom.load(butil::memory_order_relaxed);
139
0
        const size_t t = _top.load(butil::memory_order_relaxed);
140
0
        return (b <= t ? 0 : (b - t));
141
0
    }
142
143
0
    size_t capacity() const { return _capacity; }
144
145
private:
146
    // Copying a concurrent structure makes no sense.
147
    DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue);
148
149
    butil::atomic<size_t> _bottom;
150
    size_t _capacity;
151
    T* _buffer;
152
    BAIDU_CACHELINE_ALIGNMENT butil::atomic<size_t> _top;
153
};
154
155
}  // namespace bthread
156
157
#endif  // BTHREAD_WORK_STEALING_QUEUE_H