Coverage Report

Created: 2026-01-10 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/id.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Sun Aug  3 12:46:15 CST 2014
21
22
#include <deque>
23
#include "butil/logging.h"
24
#include "bthread/butex.h"                       // butex_*
25
#include "bthread/mutex.h"
26
#include "bthread/list_of_abafree_id.h"
27
#include "butil/resource_pool.h"
28
#include "bthread/bthread.h"
29
30
namespace bthread {
31
32
// This queue reduces the chance to allocate memory for deque
33
template <typename T, int N>
34
class SmallQueue {
35
public:
36
0
    SmallQueue() : _begin(0), _size(0), _full(NULL) {}
37
    
38
0
    void push(const T& val) {
39
0
        if (_full != NULL && !_full->empty()) {
40
0
            _full->push_back(val);
41
0
        } else if (_size < N) {
42
0
            int tail = _begin + _size;
43
0
            if (tail >= N) {
44
0
                tail -= N;
45
0
            }
46
0
            _c[tail] = val;
47
0
            ++_size;
48
0
        } else {
49
0
            if (_full == NULL) {
50
0
                _full = new std::deque<T>;
51
0
            }
52
0
            _full->push_back(val);
53
0
        }
54
0
    }
55
0
    bool pop(T* val) {
56
0
        if (_size > 0) {
57
0
            *val = _c[_begin];
58
0
            ++_begin;
59
0
            if (_begin >= N) {
60
0
                _begin -= N;
61
0
            }
62
0
            --_size;
63
0
            return true;
64
0
        } else if (_full && !_full->empty()) {
65
0
            *val = _full->front();
66
0
            _full->pop_front();
67
0
            return true;
68
0
        }
69
0
        return false;
70
0
    }
71
0
    bool empty() const {
72
0
        return _size == 0 && (_full == NULL || _full->empty());
73
0
    }
74
75
0
    size_t size() const {
76
0
        return _size + (_full ? _full->size() : 0);
77
0
    }
78
79
0
    void clear() {
80
0
        _size = 0;
81
0
        _begin = 0;
82
0
        if (_full) {
83
0
            _full->clear();
84
0
        }
85
0
    }
86
87
0
    ~SmallQueue() {
88
0
        delete _full;
89
0
        _full = NULL;
90
0
    }
91
    
92
private:
93
    DISALLOW_COPY_AND_ASSIGN(SmallQueue);
94
    
95
    int _begin;
96
    int _size;
97
    T _c[N];
98
    std::deque<T>* _full;
99
};
100
101
struct PendingError {
102
    bthread_id_t id;
103
    int error_code;
104
    std::string error_text;
105
    const char *location;
106
107
0
    PendingError() : id(INVALID_BTHREAD_ID), error_code(0), location(NULL) {}
108
};
109
110
struct BAIDU_CACHELINE_ALIGNMENT Id {
111
    // first_ver ~ locked_ver - 1: unlocked versions
112
    // locked_ver: locked
113
    // unlockable_ver: locked and about to be destroyed
114
    // contended_ver: locked and contended
115
    uint32_t first_ver;
116
    uint32_t locked_ver;
117
    FastPthreadMutex mutex;
118
    void* data;
119
    int (*on_error)(bthread_id_t, void*, int);
120
    int (*on_error2)(bthread_id_t, void*, int, const std::string&);
121
    const char *lock_location;
122
    uint32_t* butex;
123
    uint32_t* join_butex;
124
    SmallQueue<PendingError, 2> pending_q;
125
    
126
0
    Id() {
127
        // Although value of the butex(as version part of bthread_id_t)
128
        // does not matter, we set it to 0 to make program more deterministic.
129
0
        butex = bthread::butex_create_checked<uint32_t>();
130
0
        join_butex = bthread::butex_create_checked<uint32_t>();
131
0
        *butex = 0;
132
0
        *join_butex = 0;
133
0
    }
134
135
0
    ~Id() {
136
0
        bthread::butex_destroy(butex);
137
0
        bthread::butex_destroy(join_butex);
138
0
    }
139
140
0
    inline bool has_version(uint32_t id_ver) const {
141
0
        return id_ver >= first_ver && id_ver < locked_ver;
142
0
    }
143
0
    inline uint32_t contended_ver() const { return locked_ver + 1; }
144
0
    inline uint32_t unlockable_ver() const { return locked_ver + 2; }
145
0
    inline uint32_t last_ver() const { return unlockable_ver(); }
146
    
147
    // also the next "first_ver"
148
0
    inline uint32_t end_ver() const { return last_ver() + 1; }
149
};
150
151
BAIDU_CASSERT(sizeof(Id) % 64 == 0, sizeof_Id_must_align);
152
153
typedef butil::ResourceId<Id> IdResourceId;
154
155
0
inline bthread_id_t make_id(uint32_t version, IdResourceId slot) {
156
0
    const bthread_id_t tmp =
157
0
        { (((uint64_t)slot.value) << 32) | (uint64_t)version };
158
0
    return tmp;
159
0
}
160
161
0
inline IdResourceId get_slot(bthread_id_t id) {
162
0
    const IdResourceId tmp = { (id.value >> 32) };
163
0
    return tmp;
164
0
}
165
166
0
inline uint32_t get_version(bthread_id_t id) {
167
0
    return (uint32_t)(id.value & 0xFFFFFFFFul);
168
0
}
169
170
0
inline bool id_exists_with_true_negatives(bthread_id_t id) {
171
0
    Id* const meta = address_resource(get_slot(id));
172
0
    if (meta == NULL) {
173
0
        return false;
174
0
    }
175
0
    const uint32_t id_ver = bthread::get_version(id);
176
0
    return id_ver >= meta->first_ver && id_ver <= meta->last_ver();
177
0
}
178
// required by unittest
179
0
uint32_t id_value(bthread_id_t id) {
180
0
    Id* const meta = address_resource(get_slot(id));
181
0
    if (meta != NULL) {
182
0
        return *meta->butex;
183
0
    }
184
0
    return 0;  // valid version never be zero
185
0
}
186
187
0
static int default_bthread_id_on_error(bthread_id_t id, void*, int) {
188
0
    return bthread_id_unlock_and_destroy(id);
189
0
}
190
static int default_bthread_id_on_error2(
191
0
    bthread_id_t id, void*, int, const std::string&) {
192
0
    return bthread_id_unlock_and_destroy(id);
193
0
}
194
195
0
void id_status(bthread_id_t id, std::ostream &os) {
196
0
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
197
0
    if (!meta) {
198
0
        os << "Invalid id=" << id.value << '\n';
199
0
        return;
200
0
    }
201
0
    const uint32_t id_ver = bthread::get_version(id);
202
0
    uint32_t* butex = meta->butex;
203
0
    bool valid = true;
204
0
    void* data = NULL;
205
0
    int (*on_error)(bthread_id_t, void*, int) = NULL;
206
0
    int (*on_error2)(bthread_id_t, void*, int, const std::string&) = NULL;
207
0
    uint32_t first_ver = 0;
208
0
    uint32_t locked_ver = 0;
209
0
    uint32_t unlockable_ver = 0;
210
0
    uint32_t contended_ver = 0;
211
0
    const char *lock_location = NULL;
212
0
    SmallQueue<PendingError, 2> pending_q;
213
0
    uint32_t butex_value = 0;
214
215
0
    meta->mutex.lock();    
216
0
    if (meta->has_version(id_ver)) {
217
0
        data = meta->data;
218
0
        on_error = meta->on_error;
219
0
        on_error2 = meta->on_error2;
220
0
        first_ver = meta->first_ver;
221
0
        locked_ver = meta->locked_ver;
222
0
        unlockable_ver = meta->unlockable_ver();
223
0
        contended_ver = meta->contended_ver();
224
0
        lock_location = meta->lock_location;
225
0
        const size_t size = meta->pending_q.size();
226
0
        for (size_t i = 0; i < size; ++i) {
227
0
            PendingError front;
228
0
            meta->pending_q.pop(&front);
229
0
            meta->pending_q.push(front);
230
0
            pending_q.push(front);
231
0
        }
232
0
        butex_value = *butex;
233
0
    } else {
234
0
        valid = false;
235
0
    }
236
0
    meta->mutex.unlock();
237
238
0
    if (valid) {
239
0
        os << "First id: "
240
0
           << bthread::make_id(first_ver, bthread::get_slot(id)).value << '\n'
241
0
           << "Range: " << locked_ver - first_ver << '\n'
242
0
           << "Status: ";
243
0
        if (butex_value != first_ver) {
244
0
            os << "LOCKED at " << lock_location;
245
0
            if (butex_value == contended_ver) {
246
0
                os << " (CONTENDED)";
247
0
            } else if (butex_value == unlockable_ver) {
248
0
                os << " (ABOUT TO DESTROY)";
249
0
            } else {
250
0
                os << " (UNCONTENDED)";
251
0
            }
252
0
        } else {
253
0
            os << "UNLOCKED";
254
0
        }
255
0
        os << "\nPendingQ:";
256
0
        if (pending_q.empty()) {
257
0
            os << " EMPTY";
258
0
        } else {
259
0
            const size_t size = pending_q.size();
260
0
            for (size_t i = 0; i < size; ++i) {
261
0
                PendingError front;
262
0
                pending_q.pop(&front);
263
0
                os << " (" << front.location << "/E" << front.error_code
264
0
                   << '/' << front.error_text << ')';
265
0
            }
266
0
        }
267
0
        if (on_error) {
268
0
            if (on_error == default_bthread_id_on_error) {
269
0
                os << "\nOnError: unlock_and_destroy";
270
0
            } else {
271
0
                os << "\nOnError: " << (void*)on_error;
272
0
            }
273
0
        } else {
274
0
            if (on_error2 == default_bthread_id_on_error2) {
275
0
                os << "\nOnError2: unlock_and_destroy";
276
0
            } else {
277
0
                os << "\nOnError2: " << (void*)on_error2;
278
0
            }
279
0
        }
280
0
        os << "\nData: " << data;
281
0
    } else {
282
0
        os << "Invalid id=" << id.value;
283
0
    }
284
0
    os << '\n';
285
0
}
286
287
0
void id_pool_status(std::ostream &os) {
288
0
    os << butil::describe_resources<Id>() << '\n';
289
0
}
290
291
struct IdTraits {
292
    static const size_t BLOCK_SIZE = 63;
293
    static const size_t MAX_ENTRIES = 100000;
294
    static const size_t INIT_GC_SIZE = 4096;
295
    static const bthread_id_t ID_INIT;
296
    static bool exists(bthread_id_t id)
297
0
    { return bthread::id_exists_with_true_negatives(id); }
298
};
299
const bthread_id_t IdTraits::ID_INIT = INVALID_BTHREAD_ID;
300
301
typedef ListOfABAFreeId<bthread_id_t, IdTraits> IdList;
302
303
struct IdResetter {
304
    explicit IdResetter(int ec, const std::string& et)
305
0
        : _error_code(ec), _error_text(et) {}
306
0
    void operator()(bthread_id_t & id) const {
307
0
        bthread_id_error2_verbose(
308
0
            id, _error_code, _error_text, __FILE__ ":" BAIDU_SYMBOLSTR(__LINE__));
309
0
        id = INVALID_BTHREAD_ID;
310
0
    }
311
private:
312
    int _error_code;
313
    const std::string& _error_text;
314
};
315
316
0
size_t get_sizes(const bthread_id_list_t* list, size_t* cnt, size_t n) {
317
0
    if (list->impl == NULL) {
318
0
        return 0;
319
0
    }
320
0
    return static_cast<bthread::IdList*>(list->impl)->get_sizes(cnt, n);
321
0
}
322
323
const int ID_MAX_RANGE = 1024;
324
325
static int id_create_impl(
326
    bthread_id_t* id, void* data,
327
    int (*on_error)(bthread_id_t, void*, int),
328
0
    int (*on_error2)(bthread_id_t, void*, int, const std::string&)) {
329
0
    IdResourceId slot;
330
0
    Id* const meta = get_resource(&slot);
331
0
    if (meta) {
332
0
        meta->data = data;
333
0
        meta->on_error = on_error;
334
0
        meta->on_error2 = on_error2;
335
0
        CHECK(meta->pending_q.empty());
336
0
        uint32_t* butex = meta->butex;
337
0
        if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
338
            // Skip 0 so that bthread_id_t is never 0
339
            // avoid overflow to make comparisons simpler.
340
0
            *butex = 1;
341
0
        }
342
0
        *meta->join_butex = *butex;
343
0
        meta->first_ver = *butex;
344
0
        meta->locked_ver = *butex + 1;
345
0
        *id = make_id(*butex, slot);
346
0
        return 0;
347
0
    }
348
0
    return ENOMEM;
349
0
}
350
351
static int id_create_ranged_impl(
352
    bthread_id_t* id, void* data,
353
    int (*on_error)(bthread_id_t, void*, int),
354
    int (*on_error2)(bthread_id_t, void*, int, const std::string&),
355
0
    int range) {
356
0
    if (range < 1 || range > ID_MAX_RANGE) {
357
0
        LOG_IF(FATAL, range < 1) << "range must be positive, actually " << range;
358
0
        LOG_IF(FATAL, range > ID_MAX_RANGE ) << "max of range is " 
359
0
                << ID_MAX_RANGE << ", actually " << range;
360
0
        return EINVAL;
361
0
    }
362
0
    IdResourceId slot;
363
0
    Id* const meta = get_resource(&slot);
364
0
    if (meta) {
365
0
        meta->data = data;
366
0
        meta->on_error = on_error;
367
0
        meta->on_error2 = on_error2;
368
0
        CHECK(meta->pending_q.empty());
369
0
        uint32_t* butex = meta->butex;
370
0
        if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
371
            // Skip 0 so that bthread_id_t is never 0
372
            // avoid overflow to make comparisons simpler.
373
0
            *butex = 1;
374
0
        }
375
0
        *meta->join_butex = *butex;
376
0
        meta->first_ver = *butex;
377
0
        meta->locked_ver = *butex + range;
378
0
        *id = make_id(*butex, slot);
379
0
        return 0;
380
0
    }
381
0
    return ENOMEM;
382
0
}
383
384
}  // namespace bthread
385
386
extern "C" {
387
388
int bthread_id_create(
389
    bthread_id_t* id, void* data,
390
0
    int (*on_error)(bthread_id_t, void*, int)) {
391
0
    return bthread::id_create_impl(
392
0
        id, data,
393
0
        (on_error ? on_error : bthread::default_bthread_id_on_error), NULL);
394
0
}
395
396
int bthread_id_create_ranged(bthread_id_t* id, void* data,
397
                             int (*on_error)(bthread_id_t, void*, int),
398
0
                             int range) {
399
0
    return bthread::id_create_ranged_impl(
400
0
        id, data, 
401
0
        (on_error ? on_error : bthread::default_bthread_id_on_error),
402
0
        NULL, range);
403
0
}
404
405
int bthread_id_lock_and_reset_range_verbose(
406
0
    bthread_id_t id, void **pdata, int range, const char *location) {
407
0
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
408
0
    if (!meta) {
409
0
        return EINVAL;
410
0
    }
411
0
    const uint32_t id_ver = bthread::get_version(id);
412
0
    uint32_t* butex = meta->butex;
413
0
    bool ever_contended = false;
414
0
    meta->mutex.lock();
415
0
    while (meta->has_version(id_ver)) {
416
0
        if (*butex == meta->first_ver) {
417
            // contended locker always wakes up the butex at unlock.
418
0
            meta->lock_location = location;
419
0
            if (range == 0) {
420
                // fast path
421
0
            } else if (range < 0 ||
422
0
                       range > bthread::ID_MAX_RANGE ||
423
0
                       range + meta->first_ver <= meta->locked_ver) {
424
0
                LOG_IF(FATAL, range < 0) << "range must be positive, actually "
425
0
                                         << range;
426
0
                LOG_IF(FATAL, range > bthread::ID_MAX_RANGE)
427
0
                    << "max range is " << bthread::ID_MAX_RANGE
428
0
                    << ", actually " << range;
429
0
            } else {
430
0
                meta->locked_ver = meta->first_ver + range;
431
0
            }
432
0
            *butex = (ever_contended ? meta->contended_ver() : meta->locked_ver);
433
0
            meta->mutex.unlock();
434
0
            if (pdata) {
435
0
                *pdata = meta->data;
436
0
            }
437
0
            return 0;
438
0
        } else if (*butex != meta->unlockable_ver()) {
439
0
            *butex = meta->contended_ver();
440
0
            uint32_t expected_ver = *butex;
441
0
            meta->mutex.unlock();
442
0
            ever_contended = true;
443
0
            if (bthread::butex_wait(butex, expected_ver, NULL) < 0 &&
444
0
                errno != EWOULDBLOCK && errno != EINTR) {
445
0
                return errno;
446
0
            }
447
0
            meta->mutex.lock();
448
0
        } else { // bthread_id_about_to_destroy was called.
449
0
            meta->mutex.unlock();
450
0
            return EPERM;
451
0
        }
452
0
    }
453
0
    meta->mutex.unlock();
454
0
    return EINVAL;
455
0
}
456
457
int bthread_id_error_verbose(bthread_id_t id, int error_code, 
458
0
                             const char *location) {
459
0
    return bthread_id_error2_verbose(id, error_code, std::string(), location);
460
0
}
461
462
0
int bthread_id_about_to_destroy(bthread_id_t id) {
463
0
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
464
0
    if (!meta) {
465
0
        return EINVAL;
466
0
    }
467
0
    const uint32_t id_ver = bthread::get_version(id);
468
0
    uint32_t* butex = meta->butex;
469
0
    meta->mutex.lock();
470
0
    if (!meta->has_version(id_ver)) {
471
0
        meta->mutex.unlock();
472
0
        return EINVAL;
473
0
    }
474
0
    if (*butex == meta->first_ver) {
475
0
        meta->mutex.unlock();
476
0
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
477
0
        return EPERM;
478
0
    }
479
0
    const bool contended = (*butex == meta->contended_ver());
480
0
    *butex = meta->unlockable_ver();
481
0
    meta->mutex.unlock();
482
0
    if (contended) {
483
        // wake up all waiting lockers.
484
0
        bthread::butex_wake_except(butex, 0);
485
0
    }
486
0
    return 0;
487
0
}
488
489
0
int bthread_id_cancel(bthread_id_t id) {
490
0
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
491
0
    if (!meta) {
492
0
        return EINVAL;
493
0
    }
494
0
    uint32_t* butex = meta->butex;
495
0
    const uint32_t id_ver = bthread::get_version(id);
496
0
    meta->mutex.lock();
497
0
    if (!meta->has_version(id_ver)) {
498
0
        meta->mutex.unlock();
499
0
        return EINVAL;
500
0
    }
501
0
    if (*butex != meta->first_ver) {
502
0
        meta->mutex.unlock();
503
0
        return EPERM;
504
0
    }       
505
0
    *butex = meta->end_ver();
506
0
    meta->first_ver = *butex;
507
0
    meta->locked_ver = *butex;
508
0
    meta->mutex.unlock();
509
0
    return_resource(bthread::get_slot(id));
510
0
    return 0;
511
0
}
512
513
0
int bthread_id_join(bthread_id_t id) {
514
0
    const bthread::IdResourceId slot = bthread::get_slot(id);
515
0
    bthread::Id* const meta = address_resource(slot);
516
0
    if (!meta) {
517
        // The id is not created yet, this join is definitely wrong.
518
0
        return EINVAL;
519
0
    }
520
0
    const uint32_t id_ver = bthread::get_version(id);
521
0
    uint32_t* join_butex = meta->join_butex;
522
0
    while (1) {
523
0
        meta->mutex.lock();
524
0
        const bool has_ver = meta->has_version(id_ver);
525
0
        const uint32_t expected_ver = *join_butex;
526
0
        meta->mutex.unlock();
527
0
        if (!has_ver) {
528
0
            break;
529
0
        }
530
0
        if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
531
0
            errno != EWOULDBLOCK && errno != EINTR) {
532
0
            return errno;
533
0
        }
534
0
    }
535
0
    return 0;
536
0
}
537
538
0
int bthread_id_trylock(bthread_id_t id, void** pdata) {
539
0
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
540
0
    if (!meta) {
541
0
        return EINVAL;
542
0
    }
543
0
    uint32_t* butex = meta->butex;
544
0
    const uint32_t id_ver = bthread::get_version(id);
545
0
    meta->mutex.lock();
546
0
    if (!meta->has_version(id_ver)) {
547
0
        meta->mutex.unlock();
548
0
        return EINVAL;
549
0
    }
550
0
    if (*butex != meta->first_ver) {
551
0
        meta->mutex.unlock();
552
0
        return EBUSY;
553
0
    }
554
0
    *butex = meta->locked_ver;
555
0
    meta->mutex.unlock();
556
0
    if (pdata != NULL) {
557
0
        *pdata = meta->data;
558
0
    }
559
0
    return 0;
560
0
}
561
562
int bthread_id_lock_verbose(bthread_id_t id, void** pdata,
563
0
                            const char *location) {
564
0
    return bthread_id_lock_and_reset_range_verbose(id, pdata, 0, location);
565
0
}
566
567
0
int bthread_id_unlock(bthread_id_t id) {
568
0
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
569
0
    if (!meta) {
570
0
        return EINVAL;
571
0
    }
572
0
    uint32_t* butex = meta->butex;
573
    // Release fence makes sure all changes made before signal visible to
574
    // woken-up waiters.
575
0
    const uint32_t id_ver = bthread::get_version(id);
576
0
    meta->mutex.lock();
577
0
    if (!meta->has_version(id_ver)) {
578
0
        meta->mutex.unlock();
579
0
        LOG(FATAL) << "Invalid bthread_id=" << id.value;
580
0
        return EINVAL;
581
0
    }
582
0
    if (*butex == meta->first_ver) {
583
0
        meta->mutex.unlock();
584
0
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
585
0
        return EPERM;
586
0
    }
587
0
    bthread::PendingError front;
588
0
    if (meta->pending_q.pop(&front)) {
589
0
        meta->lock_location = front.location;
590
0
        meta->mutex.unlock();
591
0
        if (meta->on_error) {
592
0
            return meta->on_error(front.id, meta->data, front.error_code);
593
0
        } else {
594
0
            return meta->on_error2(front.id, meta->data, front.error_code,
595
0
                                   front.error_text);
596
0
        }
597
0
    } else {
598
0
        const bool contended = (*butex == meta->contended_ver());
599
0
        *butex = meta->first_ver;
600
0
        meta->mutex.unlock();
601
0
        if (contended) {
602
            // We may wake up already-reused id, but that's OK.
603
0
            bthread::butex_wake(butex);
604
0
        }
605
0
        return 0; 
606
0
    }
607
0
}
608
609
0
int bthread_id_unlock_and_destroy(bthread_id_t id) {
610
0
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
611
0
    if (!meta) {
612
0
        return EINVAL;
613
0
    }
614
0
    uint32_t* butex = meta->butex;
615
0
    uint32_t* join_butex = meta->join_butex;
616
0
    const uint32_t id_ver = bthread::get_version(id);
617
0
    meta->mutex.lock();
618
0
    if (!meta->has_version(id_ver)) {
619
0
        meta->mutex.unlock();
620
0
        LOG(FATAL) << "Invalid bthread_id=" << id.value;
621
0
        return EINVAL;
622
0
    }
623
0
    if (*butex == meta->first_ver) {
624
0
        meta->mutex.unlock();
625
0
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
626
0
        return EPERM;
627
0
    }
628
0
    const uint32_t next_ver = meta->end_ver();
629
0
    *butex = next_ver;
630
0
    *join_butex = next_ver;
631
0
    meta->first_ver = next_ver;
632
0
    meta->locked_ver = next_ver;
633
0
    meta->pending_q.clear();
634
0
    meta->mutex.unlock();
635
    // Notice that butex_wake* returns # of woken-up, not successful or not.
636
0
    bthread::butex_wake_except(butex, 0);
637
0
    bthread::butex_wake_all(join_butex);
638
0
    return_resource(bthread::get_slot(id));
639
0
    return 0;
640
0
}
641
642
int bthread_id_list_init(bthread_id_list_t* list,
643
                         unsigned /*size*/,
644
0
                         unsigned /*conflict_size*/) {
645
0
    list->impl = NULL;  // create on demand.
646
    // Set unused fields to zero as well.
647
0
    list->head = 0;
648
0
    list->size = 0;
649
0
    list->conflict_head = 0;
650
0
    list->conflict_size = 0;
651
0
    return 0;
652
0
}
653
654
0
void bthread_id_list_destroy(bthread_id_list_t* list) {
655
0
    delete static_cast<bthread::IdList*>(list->impl);
656
0
    list->impl = NULL;
657
0
}
658
659
0
int bthread_id_list_add(bthread_id_list_t* list, bthread_id_t id) {
660
0
    if (list->impl == NULL) {
661
0
        list->impl = new (std::nothrow) bthread::IdList;
662
0
        if (NULL == list->impl) {
663
0
            return ENOMEM;
664
0
        }
665
0
    }
666
0
    return static_cast<bthread::IdList*>(list->impl)->add(id);
667
0
}
668
669
0
int bthread_id_list_reset(bthread_id_list_t* list, int error_code) {
670
0
    return bthread_id_list_reset2(list, error_code, std::string());
671
0
}
672
673
void bthread_id_list_swap(bthread_id_list_t* list1, 
674
0
                          bthread_id_list_t* list2) {
675
0
    std::swap(list1->impl, list2->impl);
676
0
}
677
678
int bthread_id_list_reset_pthreadsafe(bthread_id_list_t* list, int error_code,
679
0
                                       pthread_mutex_t* mutex) {
680
0
    return bthread_id_list_reset2_pthreadsafe(
681
0
        list, error_code, std::string(), mutex);
682
0
}
683
684
int bthread_id_list_reset_bthreadsafe(bthread_id_list_t* list, int error_code,
685
0
                                      bthread_mutex_t* mutex) {
686
0
    return bthread_id_list_reset2_bthreadsafe(
687
0
        list, error_code, std::string(), mutex);
688
0
}
689
690
}  // extern "C"
691
692
int bthread_id_create2(
693
    bthread_id_t* id, void* data,
694
0
    int (*on_error)(bthread_id_t, void*, int, const std::string&)) {
695
0
    return bthread::id_create_impl(
696
0
        id, data, NULL,
697
0
        (on_error ? on_error : bthread::default_bthread_id_on_error2));
698
0
}
699
700
int bthread_id_create2_ranged(
701
    bthread_id_t* id, void* data,
702
    int (*on_error)(bthread_id_t, void*, int, const std::string&),
703
0
    int range) {
704
0
    return bthread::id_create_ranged_impl(
705
0
        id, data, NULL,
706
0
        (on_error ? on_error : bthread::default_bthread_id_on_error2), range);
707
0
}
708
709
int bthread_id_error2_verbose(bthread_id_t id, int error_code,
710
                              const std::string& error_text,
711
0
                              const char *location) {
712
0
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
713
0
    if (!meta) {
714
0
        return EINVAL;
715
0
    }
716
0
    const uint32_t id_ver = bthread::get_version(id);
717
0
    uint32_t* butex = meta->butex;
718
0
    meta->mutex.lock();
719
0
    if (!meta->has_version(id_ver)) {
720
0
        meta->mutex.unlock();
721
0
        return EINVAL;
722
0
    }
723
0
    if (*butex == meta->first_ver) {
724
0
        *butex = meta->locked_ver;
725
0
        meta->lock_location = location;
726
0
        meta->mutex.unlock();
727
0
        if (meta->on_error) {
728
0
            return meta->on_error(id, meta->data, error_code);
729
0
        } else {
730
0
            return meta->on_error2(id, meta->data, error_code, error_text);
731
0
        }
732
0
    } else {
733
0
        bthread::PendingError e;
734
0
        e.id = id;
735
0
        e.error_code = error_code;
736
0
        e.error_text = error_text;
737
0
        e.location = location;
738
0
        meta->pending_q.push(e);
739
0
        meta->mutex.unlock();
740
0
        return 0;
741
0
    }
742
0
}
743
744
int bthread_id_list_reset2(bthread_id_list_t* list,
745
                           int error_code,
746
0
                           const std::string& error_text) {
747
0
    if (list->impl != NULL) {
748
0
        static_cast<bthread::IdList*>(list->impl)->apply(
749
0
            bthread::IdResetter(error_code, error_text));
750
0
    }
751
0
    return 0;
752
0
}
753
754
int bthread_id_list_reset2_pthreadsafe(bthread_id_list_t* list,
755
                                       int error_code,
756
                                       const std::string& error_text,
757
0
                                       pthread_mutex_t* mutex) {
758
0
    if (mutex == NULL) {
759
0
        return EINVAL;
760
0
    }
761
0
    if (list->impl == NULL) {
762
0
        return 0;
763
0
    }
764
0
    bthread_id_list_t tmplist;
765
0
    const int rc = bthread_id_list_init(&tmplist, 0, 0);
766
0
    if (rc != 0) {
767
0
        return rc;
768
0
    }
769
    // Swap out the list then reset. The critical section is very small.
770
0
    pthread_mutex_lock(mutex);
771
0
    std::swap(list->impl, tmplist.impl);
772
0
    pthread_mutex_unlock(mutex);
773
0
    const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text);
774
0
    bthread_id_list_destroy(&tmplist);
775
0
    return rc2;
776
0
}
777
778
int bthread_id_list_reset2_bthreadsafe(bthread_id_list_t* list,
779
                                       int error_code,
780
                                       const std::string& error_text,
781
0
                                       bthread_mutex_t* mutex) {
782
0
    if (mutex == NULL) {
783
0
        return EINVAL;
784
0
    }
785
0
    if (list->impl == NULL) {
786
0
        return 0;
787
0
    }
788
0
    bthread_id_list_t tmplist;
789
0
    const int rc = bthread_id_list_init(&tmplist, 0, 0);
790
0
    if (rc != 0) {
791
0
        return rc;
792
0
    }
793
    // Swap out the list then reset. The critical section is very small.
794
0
    bthread_mutex_lock(mutex);
795
0
    std::swap(list->impl, tmplist.impl);
796
0
    bthread_mutex_unlock(mutex);
797
0
    const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text);
798
0
    bthread_id_list_destroy(&tmplist);
799
0
    return rc2;
800
0
}