/src/brpc/src/bthread/id.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // bthread - An M:N threading library to make applications more concurrent. |
19 | | |
20 | | // Date: Sun Aug 3 12:46:15 CST 2014 |
21 | | |
22 | | #include <deque> |
23 | | #include "butil/logging.h" |
24 | | #include "bthread/butex.h" // butex_* |
25 | | #include "bthread/mutex.h" |
26 | | #include "bthread/list_of_abafree_id.h" |
27 | | #include "butil/resource_pool.h" |
28 | | #include "bthread/bthread.h" |
29 | | |
30 | | namespace bthread { |
31 | | |
32 | | // This queue reduces the chance to allocate memory for deque |
33 | | template <typename T, int N> |
34 | | class SmallQueue { |
35 | | public: |
36 | 0 | SmallQueue() : _begin(0), _size(0), _full(NULL) {} |
37 | | |
38 | 0 | void push(const T& val) { |
39 | 0 | if (_full != NULL && !_full->empty()) { |
40 | 0 | _full->push_back(val); |
41 | 0 | } else if (_size < N) { |
42 | 0 | int tail = _begin + _size; |
43 | 0 | if (tail >= N) { |
44 | 0 | tail -= N; |
45 | 0 | } |
46 | 0 | _c[tail] = val; |
47 | 0 | ++_size; |
48 | 0 | } else { |
49 | 0 | if (_full == NULL) { |
50 | 0 | _full = new std::deque<T>; |
51 | 0 | } |
52 | 0 | _full->push_back(val); |
53 | 0 | } |
54 | 0 | } |
55 | 0 | bool pop(T* val) { |
56 | 0 | if (_size > 0) { |
57 | 0 | *val = _c[_begin]; |
58 | 0 | ++_begin; |
59 | 0 | if (_begin >= N) { |
60 | 0 | _begin -= N; |
61 | 0 | } |
62 | 0 | --_size; |
63 | 0 | return true; |
64 | 0 | } else if (_full && !_full->empty()) { |
65 | 0 | *val = _full->front(); |
66 | 0 | _full->pop_front(); |
67 | 0 | return true; |
68 | 0 | } |
69 | 0 | return false; |
70 | 0 | } |
71 | 0 | bool empty() const { |
72 | 0 | return _size == 0 && (_full == NULL || _full->empty()); |
73 | 0 | } |
74 | | |
75 | 0 | size_t size() const { |
76 | 0 | return _size + (_full ? _full->size() : 0); |
77 | 0 | } |
78 | | |
79 | 0 | void clear() { |
80 | 0 | _size = 0; |
81 | 0 | _begin = 0; |
82 | 0 | if (_full) { |
83 | 0 | _full->clear(); |
84 | 0 | } |
85 | 0 | } |
86 | | |
87 | 0 | ~SmallQueue() { |
88 | 0 | delete _full; |
89 | 0 | _full = NULL; |
90 | 0 | } |
91 | | |
92 | | private: |
93 | | DISALLOW_COPY_AND_ASSIGN(SmallQueue); |
94 | | |
95 | | int _begin; |
96 | | int _size; |
97 | | T _c[N]; |
98 | | std::deque<T>* _full; |
99 | | }; |
100 | | |
101 | | struct PendingError { |
102 | | bthread_id_t id; |
103 | | int error_code; |
104 | | std::string error_text; |
105 | | const char *location; |
106 | | |
107 | 0 | PendingError() : id(INVALID_BTHREAD_ID), error_code(0), location(NULL) {} |
108 | | }; |
109 | | |
110 | | struct BAIDU_CACHELINE_ALIGNMENT Id { |
111 | | // first_ver ~ locked_ver - 1: unlocked versions |
112 | | // locked_ver: locked |
113 | | // unlockable_ver: locked and about to be destroyed |
114 | | // contended_ver: locked and contended |
115 | | uint32_t first_ver; |
116 | | uint32_t locked_ver; |
117 | | FastPthreadMutex mutex; |
118 | | void* data; |
119 | | int (*on_error)(bthread_id_t, void*, int); |
120 | | int (*on_error2)(bthread_id_t, void*, int, const std::string&); |
121 | | const char *lock_location; |
122 | | uint32_t* butex; |
123 | | uint32_t* join_butex; |
124 | | SmallQueue<PendingError, 2> pending_q; |
125 | | |
126 | 0 | Id() { |
127 | | // Although value of the butex(as version part of bthread_id_t) |
128 | | // does not matter, we set it to 0 to make program more deterministic. |
129 | 0 | butex = bthread::butex_create_checked<uint32_t>(); |
130 | 0 | join_butex = bthread::butex_create_checked<uint32_t>(); |
131 | 0 | *butex = 0; |
132 | 0 | *join_butex = 0; |
133 | 0 | } |
134 | | |
135 | 0 | ~Id() { |
136 | 0 | bthread::butex_destroy(butex); |
137 | 0 | bthread::butex_destroy(join_butex); |
138 | 0 | } |
139 | | |
140 | 0 | inline bool has_version(uint32_t id_ver) const { |
141 | 0 | return id_ver >= first_ver && id_ver < locked_ver; |
142 | 0 | } |
143 | 0 | inline uint32_t contended_ver() const { return locked_ver + 1; } |
144 | 0 | inline uint32_t unlockable_ver() const { return locked_ver + 2; } |
145 | 0 | inline uint32_t last_ver() const { return unlockable_ver(); } |
146 | | |
147 | | // also the next "first_ver" |
148 | 0 | inline uint32_t end_ver() const { return last_ver() + 1; } |
149 | | }; |
150 | | |
151 | | BAIDU_CASSERT(sizeof(Id) % 64 == 0, sizeof_Id_must_align); |
152 | | |
153 | | typedef butil::ResourceId<Id> IdResourceId; |
154 | | |
155 | 0 | inline bthread_id_t make_id(uint32_t version, IdResourceId slot) { |
156 | 0 | const bthread_id_t tmp = |
157 | 0 | { (((uint64_t)slot.value) << 32) | (uint64_t)version }; |
158 | 0 | return tmp; |
159 | 0 | } |
160 | | |
161 | 0 | inline IdResourceId get_slot(bthread_id_t id) { |
162 | 0 | const IdResourceId tmp = { (id.value >> 32) }; |
163 | 0 | return tmp; |
164 | 0 | } |
165 | | |
166 | 0 | inline uint32_t get_version(bthread_id_t id) { |
167 | 0 | return (uint32_t)(id.value & 0xFFFFFFFFul); |
168 | 0 | } |
169 | | |
170 | 0 | inline bool id_exists_with_true_negatives(bthread_id_t id) { |
171 | 0 | Id* const meta = address_resource(get_slot(id)); |
172 | 0 | if (meta == NULL) { |
173 | 0 | return false; |
174 | 0 | } |
175 | 0 | const uint32_t id_ver = bthread::get_version(id); |
176 | 0 | return id_ver >= meta->first_ver && id_ver <= meta->last_ver(); |
177 | 0 | } |
178 | | // required by unittest |
179 | 0 | uint32_t id_value(bthread_id_t id) { |
180 | 0 | Id* const meta = address_resource(get_slot(id)); |
181 | 0 | if (meta != NULL) { |
182 | 0 | return *meta->butex; |
183 | 0 | } |
184 | 0 | return 0; // valid version never be zero |
185 | 0 | } |
186 | | |
187 | 0 | static int default_bthread_id_on_error(bthread_id_t id, void*, int) { |
188 | 0 | return bthread_id_unlock_and_destroy(id); |
189 | 0 | } |
190 | | static int default_bthread_id_on_error2( |
191 | 0 | bthread_id_t id, void*, int, const std::string&) { |
192 | 0 | return bthread_id_unlock_and_destroy(id); |
193 | 0 | } |
194 | | |
195 | 0 | void id_status(bthread_id_t id, std::ostream &os) { |
196 | 0 | bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
197 | 0 | if (!meta) { |
198 | 0 | os << "Invalid id=" << id.value << '\n'; |
199 | 0 | return; |
200 | 0 | } |
201 | 0 | const uint32_t id_ver = bthread::get_version(id); |
202 | 0 | uint32_t* butex = meta->butex; |
203 | 0 | bool valid = true; |
204 | 0 | void* data = NULL; |
205 | 0 | int (*on_error)(bthread_id_t, void*, int) = NULL; |
206 | 0 | int (*on_error2)(bthread_id_t, void*, int, const std::string&) = NULL; |
207 | 0 | uint32_t first_ver = 0; |
208 | 0 | uint32_t locked_ver = 0; |
209 | 0 | uint32_t unlockable_ver = 0; |
210 | 0 | uint32_t contended_ver = 0; |
211 | 0 | const char *lock_location = NULL; |
212 | 0 | SmallQueue<PendingError, 2> pending_q; |
213 | 0 | uint32_t butex_value = 0; |
214 | |
|
215 | 0 | meta->mutex.lock(); |
216 | 0 | if (meta->has_version(id_ver)) { |
217 | 0 | data = meta->data; |
218 | 0 | on_error = meta->on_error; |
219 | 0 | on_error2 = meta->on_error2; |
220 | 0 | first_ver = meta->first_ver; |
221 | 0 | locked_ver = meta->locked_ver; |
222 | 0 | unlockable_ver = meta->unlockable_ver(); |
223 | 0 | contended_ver = meta->contended_ver(); |
224 | 0 | lock_location = meta->lock_location; |
225 | 0 | const size_t size = meta->pending_q.size(); |
226 | 0 | for (size_t i = 0; i < size; ++i) { |
227 | 0 | PendingError front; |
228 | 0 | meta->pending_q.pop(&front); |
229 | 0 | meta->pending_q.push(front); |
230 | 0 | pending_q.push(front); |
231 | 0 | } |
232 | 0 | butex_value = *butex; |
233 | 0 | } else { |
234 | 0 | valid = false; |
235 | 0 | } |
236 | 0 | meta->mutex.unlock(); |
237 | |
|
238 | 0 | if (valid) { |
239 | 0 | os << "First id: " |
240 | 0 | << bthread::make_id(first_ver, bthread::get_slot(id)).value << '\n' |
241 | 0 | << "Range: " << locked_ver - first_ver << '\n' |
242 | 0 | << "Status: "; |
243 | 0 | if (butex_value != first_ver) { |
244 | 0 | os << "LOCKED at " << lock_location; |
245 | 0 | if (butex_value == contended_ver) { |
246 | 0 | os << " (CONTENDED)"; |
247 | 0 | } else if (butex_value == unlockable_ver) { |
248 | 0 | os << " (ABOUT TO DESTROY)"; |
249 | 0 | } else { |
250 | 0 | os << " (UNCONTENDED)"; |
251 | 0 | } |
252 | 0 | } else { |
253 | 0 | os << "UNLOCKED"; |
254 | 0 | } |
255 | 0 | os << "\nPendingQ:"; |
256 | 0 | if (pending_q.empty()) { |
257 | 0 | os << " EMPTY"; |
258 | 0 | } else { |
259 | 0 | const size_t size = pending_q.size(); |
260 | 0 | for (size_t i = 0; i < size; ++i) { |
261 | 0 | PendingError front; |
262 | 0 | pending_q.pop(&front); |
263 | 0 | os << " (" << front.location << "/E" << front.error_code |
264 | 0 | << '/' << front.error_text << ')'; |
265 | 0 | } |
266 | 0 | } |
267 | 0 | if (on_error) { |
268 | 0 | if (on_error == default_bthread_id_on_error) { |
269 | 0 | os << "\nOnError: unlock_and_destroy"; |
270 | 0 | } else { |
271 | 0 | os << "\nOnError: " << (void*)on_error; |
272 | 0 | } |
273 | 0 | } else { |
274 | 0 | if (on_error2 == default_bthread_id_on_error2) { |
275 | 0 | os << "\nOnError2: unlock_and_destroy"; |
276 | 0 | } else { |
277 | 0 | os << "\nOnError2: " << (void*)on_error2; |
278 | 0 | } |
279 | 0 | } |
280 | 0 | os << "\nData: " << data; |
281 | 0 | } else { |
282 | 0 | os << "Invalid id=" << id.value; |
283 | 0 | } |
284 | 0 | os << '\n'; |
285 | 0 | } |
286 | | |
287 | 0 | void id_pool_status(std::ostream &os) { |
288 | 0 | os << butil::describe_resources<Id>() << '\n'; |
289 | 0 | } |
290 | | |
291 | | struct IdTraits { |
292 | | static const size_t BLOCK_SIZE = 63; |
293 | | static const size_t MAX_ENTRIES = 100000; |
294 | | static const size_t INIT_GC_SIZE = 4096; |
295 | | static const bthread_id_t ID_INIT; |
296 | | static bool exists(bthread_id_t id) |
297 | 0 | { return bthread::id_exists_with_true_negatives(id); } |
298 | | }; |
299 | | const bthread_id_t IdTraits::ID_INIT = INVALID_BTHREAD_ID; |
300 | | |
301 | | typedef ListOfABAFreeId<bthread_id_t, IdTraits> IdList; |
302 | | |
303 | | struct IdResetter { |
304 | | explicit IdResetter(int ec, const std::string& et) |
305 | 0 | : _error_code(ec), _error_text(et) {} |
306 | 0 | void operator()(bthread_id_t & id) const { |
307 | 0 | bthread_id_error2_verbose( |
308 | 0 | id, _error_code, _error_text, __FILE__ ":" BAIDU_SYMBOLSTR(__LINE__)); |
309 | 0 | id = INVALID_BTHREAD_ID; |
310 | 0 | } |
311 | | private: |
312 | | int _error_code; |
313 | | const std::string& _error_text; |
314 | | }; |
315 | | |
316 | 0 | size_t get_sizes(const bthread_id_list_t* list, size_t* cnt, size_t n) { |
317 | 0 | if (list->impl == NULL) { |
318 | 0 | return 0; |
319 | 0 | } |
320 | 0 | return static_cast<bthread::IdList*>(list->impl)->get_sizes(cnt, n); |
321 | 0 | } |
322 | | |
323 | | const int ID_MAX_RANGE = 1024; |
324 | | |
325 | | static int id_create_impl( |
326 | | bthread_id_t* id, void* data, |
327 | | int (*on_error)(bthread_id_t, void*, int), |
328 | 0 | int (*on_error2)(bthread_id_t, void*, int, const std::string&)) { |
329 | 0 | IdResourceId slot; |
330 | 0 | Id* const meta = get_resource(&slot); |
331 | 0 | if (meta) { |
332 | 0 | meta->data = data; |
333 | 0 | meta->on_error = on_error; |
334 | 0 | meta->on_error2 = on_error2; |
335 | 0 | CHECK(meta->pending_q.empty()); |
336 | 0 | uint32_t* butex = meta->butex; |
337 | 0 | if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) { |
338 | | // Skip 0 so that bthread_id_t is never 0 |
339 | | // avoid overflow to make comparisons simpler. |
340 | 0 | *butex = 1; |
341 | 0 | } |
342 | 0 | *meta->join_butex = *butex; |
343 | 0 | meta->first_ver = *butex; |
344 | 0 | meta->locked_ver = *butex + 1; |
345 | 0 | *id = make_id(*butex, slot); |
346 | 0 | return 0; |
347 | 0 | } |
348 | 0 | return ENOMEM; |
349 | 0 | } |
350 | | |
351 | | static int id_create_ranged_impl( |
352 | | bthread_id_t* id, void* data, |
353 | | int (*on_error)(bthread_id_t, void*, int), |
354 | | int (*on_error2)(bthread_id_t, void*, int, const std::string&), |
355 | 0 | int range) { |
356 | 0 | if (range < 1 || range > ID_MAX_RANGE) { |
357 | 0 | LOG_IF(FATAL, range < 1) << "range must be positive, actually " << range; |
358 | 0 | LOG_IF(FATAL, range > ID_MAX_RANGE ) << "max of range is " |
359 | 0 | << ID_MAX_RANGE << ", actually " << range; |
360 | 0 | return EINVAL; |
361 | 0 | } |
362 | 0 | IdResourceId slot; |
363 | 0 | Id* const meta = get_resource(&slot); |
364 | 0 | if (meta) { |
365 | 0 | meta->data = data; |
366 | 0 | meta->on_error = on_error; |
367 | 0 | meta->on_error2 = on_error2; |
368 | 0 | CHECK(meta->pending_q.empty()); |
369 | 0 | uint32_t* butex = meta->butex; |
370 | 0 | if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) { |
371 | | // Skip 0 so that bthread_id_t is never 0 |
372 | | // avoid overflow to make comparisons simpler. |
373 | 0 | *butex = 1; |
374 | 0 | } |
375 | 0 | *meta->join_butex = *butex; |
376 | 0 | meta->first_ver = *butex; |
377 | 0 | meta->locked_ver = *butex + range; |
378 | 0 | *id = make_id(*butex, slot); |
379 | 0 | return 0; |
380 | 0 | } |
381 | 0 | return ENOMEM; |
382 | 0 | } |
383 | | |
384 | | } // namespace bthread |
385 | | |
386 | | extern "C" { |
387 | | |
388 | | int bthread_id_create( |
389 | | bthread_id_t* id, void* data, |
390 | 0 | int (*on_error)(bthread_id_t, void*, int)) { |
391 | 0 | return bthread::id_create_impl( |
392 | 0 | id, data, |
393 | 0 | (on_error ? on_error : bthread::default_bthread_id_on_error), NULL); |
394 | 0 | } |
395 | | |
396 | | int bthread_id_create_ranged(bthread_id_t* id, void* data, |
397 | | int (*on_error)(bthread_id_t, void*, int), |
398 | 0 | int range) { |
399 | 0 | return bthread::id_create_ranged_impl( |
400 | 0 | id, data, |
401 | 0 | (on_error ? on_error : bthread::default_bthread_id_on_error), |
402 | 0 | NULL, range); |
403 | 0 | } |
404 | | |
405 | | int bthread_id_lock_and_reset_range_verbose( |
406 | 0 | bthread_id_t id, void **pdata, int range, const char *location) { |
407 | 0 | bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
408 | 0 | if (!meta) { |
409 | 0 | return EINVAL; |
410 | 0 | } |
411 | 0 | const uint32_t id_ver = bthread::get_version(id); |
412 | 0 | uint32_t* butex = meta->butex; |
413 | 0 | bool ever_contended = false; |
414 | 0 | meta->mutex.lock(); |
415 | 0 | while (meta->has_version(id_ver)) { |
416 | 0 | if (*butex == meta->first_ver) { |
417 | | // contended locker always wakes up the butex at unlock. |
418 | 0 | meta->lock_location = location; |
419 | 0 | if (range == 0) { |
420 | | // fast path |
421 | 0 | } else if (range < 0 || |
422 | 0 | range > bthread::ID_MAX_RANGE || |
423 | 0 | range + meta->first_ver <= meta->locked_ver) { |
424 | 0 | LOG_IF(FATAL, range < 0) << "range must be positive, actually " |
425 | 0 | << range; |
426 | 0 | LOG_IF(FATAL, range > bthread::ID_MAX_RANGE) |
427 | 0 | << "max range is " << bthread::ID_MAX_RANGE |
428 | 0 | << ", actually " << range; |
429 | 0 | } else { |
430 | 0 | meta->locked_ver = meta->first_ver + range; |
431 | 0 | } |
432 | 0 | *butex = (ever_contended ? meta->contended_ver() : meta->locked_ver); |
433 | 0 | meta->mutex.unlock(); |
434 | 0 | if (pdata) { |
435 | 0 | *pdata = meta->data; |
436 | 0 | } |
437 | 0 | return 0; |
438 | 0 | } else if (*butex != meta->unlockable_ver()) { |
439 | 0 | *butex = meta->contended_ver(); |
440 | 0 | uint32_t expected_ver = *butex; |
441 | 0 | meta->mutex.unlock(); |
442 | 0 | ever_contended = true; |
443 | 0 | if (bthread::butex_wait(butex, expected_ver, NULL) < 0 && |
444 | 0 | errno != EWOULDBLOCK && errno != EINTR) { |
445 | 0 | return errno; |
446 | 0 | } |
447 | 0 | meta->mutex.lock(); |
448 | 0 | } else { // bthread_id_about_to_destroy was called. |
449 | 0 | meta->mutex.unlock(); |
450 | 0 | return EPERM; |
451 | 0 | } |
452 | 0 | } |
453 | 0 | meta->mutex.unlock(); |
454 | 0 | return EINVAL; |
455 | 0 | } |
456 | | |
457 | | int bthread_id_error_verbose(bthread_id_t id, int error_code, |
458 | 0 | const char *location) { |
459 | 0 | return bthread_id_error2_verbose(id, error_code, std::string(), location); |
460 | 0 | } |
461 | | |
462 | 0 | int bthread_id_about_to_destroy(bthread_id_t id) { |
463 | 0 | bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
464 | 0 | if (!meta) { |
465 | 0 | return EINVAL; |
466 | 0 | } |
467 | 0 | const uint32_t id_ver = bthread::get_version(id); |
468 | 0 | uint32_t* butex = meta->butex; |
469 | 0 | meta->mutex.lock(); |
470 | 0 | if (!meta->has_version(id_ver)) { |
471 | 0 | meta->mutex.unlock(); |
472 | 0 | return EINVAL; |
473 | 0 | } |
474 | 0 | if (*butex == meta->first_ver) { |
475 | 0 | meta->mutex.unlock(); |
476 | 0 | LOG(FATAL) << "bthread_id=" << id.value << " is not locked!"; |
477 | 0 | return EPERM; |
478 | 0 | } |
479 | 0 | const bool contended = (*butex == meta->contended_ver()); |
480 | 0 | *butex = meta->unlockable_ver(); |
481 | 0 | meta->mutex.unlock(); |
482 | 0 | if (contended) { |
483 | | // wake up all waiting lockers. |
484 | 0 | bthread::butex_wake_except(butex, 0); |
485 | 0 | } |
486 | 0 | return 0; |
487 | 0 | } |
488 | | |
489 | 0 | int bthread_id_cancel(bthread_id_t id) { |
490 | 0 | bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
491 | 0 | if (!meta) { |
492 | 0 | return EINVAL; |
493 | 0 | } |
494 | 0 | uint32_t* butex = meta->butex; |
495 | 0 | const uint32_t id_ver = bthread::get_version(id); |
496 | 0 | meta->mutex.lock(); |
497 | 0 | if (!meta->has_version(id_ver)) { |
498 | 0 | meta->mutex.unlock(); |
499 | 0 | return EINVAL; |
500 | 0 | } |
501 | 0 | if (*butex != meta->first_ver) { |
502 | 0 | meta->mutex.unlock(); |
503 | 0 | return EPERM; |
504 | 0 | } |
505 | 0 | *butex = meta->end_ver(); |
506 | 0 | meta->first_ver = *butex; |
507 | 0 | meta->locked_ver = *butex; |
508 | 0 | meta->mutex.unlock(); |
509 | 0 | return_resource(bthread::get_slot(id)); |
510 | 0 | return 0; |
511 | 0 | } |
512 | | |
513 | 0 | int bthread_id_join(bthread_id_t id) { |
514 | 0 | const bthread::IdResourceId slot = bthread::get_slot(id); |
515 | 0 | bthread::Id* const meta = address_resource(slot); |
516 | 0 | if (!meta) { |
517 | | // The id is not created yet, this join is definitely wrong. |
518 | 0 | return EINVAL; |
519 | 0 | } |
520 | 0 | const uint32_t id_ver = bthread::get_version(id); |
521 | 0 | uint32_t* join_butex = meta->join_butex; |
522 | 0 | while (1) { |
523 | 0 | meta->mutex.lock(); |
524 | 0 | const bool has_ver = meta->has_version(id_ver); |
525 | 0 | const uint32_t expected_ver = *join_butex; |
526 | 0 | meta->mutex.unlock(); |
527 | 0 | if (!has_ver) { |
528 | 0 | break; |
529 | 0 | } |
530 | 0 | if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 && |
531 | 0 | errno != EWOULDBLOCK && errno != EINTR) { |
532 | 0 | return errno; |
533 | 0 | } |
534 | 0 | } |
535 | 0 | return 0; |
536 | 0 | } |
537 | | |
538 | 0 | int bthread_id_trylock(bthread_id_t id, void** pdata) { |
539 | 0 | bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
540 | 0 | if (!meta) { |
541 | 0 | return EINVAL; |
542 | 0 | } |
543 | 0 | uint32_t* butex = meta->butex; |
544 | 0 | const uint32_t id_ver = bthread::get_version(id); |
545 | 0 | meta->mutex.lock(); |
546 | 0 | if (!meta->has_version(id_ver)) { |
547 | 0 | meta->mutex.unlock(); |
548 | 0 | return EINVAL; |
549 | 0 | } |
550 | 0 | if (*butex != meta->first_ver) { |
551 | 0 | meta->mutex.unlock(); |
552 | 0 | return EBUSY; |
553 | 0 | } |
554 | 0 | *butex = meta->locked_ver; |
555 | 0 | meta->mutex.unlock(); |
556 | 0 | if (pdata != NULL) { |
557 | 0 | *pdata = meta->data; |
558 | 0 | } |
559 | 0 | return 0; |
560 | 0 | } |
561 | | |
562 | | int bthread_id_lock_verbose(bthread_id_t id, void** pdata, |
563 | 0 | const char *location) { |
564 | 0 | return bthread_id_lock_and_reset_range_verbose(id, pdata, 0, location); |
565 | 0 | } |
566 | | |
567 | 0 | int bthread_id_unlock(bthread_id_t id) { |
568 | 0 | bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
569 | 0 | if (!meta) { |
570 | 0 | return EINVAL; |
571 | 0 | } |
572 | 0 | uint32_t* butex = meta->butex; |
573 | | // Release fence makes sure all changes made before signal visible to |
574 | | // woken-up waiters. |
575 | 0 | const uint32_t id_ver = bthread::get_version(id); |
576 | 0 | meta->mutex.lock(); |
577 | 0 | if (!meta->has_version(id_ver)) { |
578 | 0 | meta->mutex.unlock(); |
579 | 0 | LOG(FATAL) << "Invalid bthread_id=" << id.value; |
580 | 0 | return EINVAL; |
581 | 0 | } |
582 | 0 | if (*butex == meta->first_ver) { |
583 | 0 | meta->mutex.unlock(); |
584 | 0 | LOG(FATAL) << "bthread_id=" << id.value << " is not locked!"; |
585 | 0 | return EPERM; |
586 | 0 | } |
587 | 0 | bthread::PendingError front; |
588 | 0 | if (meta->pending_q.pop(&front)) { |
589 | 0 | meta->lock_location = front.location; |
590 | 0 | meta->mutex.unlock(); |
591 | 0 | if (meta->on_error) { |
592 | 0 | return meta->on_error(front.id, meta->data, front.error_code); |
593 | 0 | } else { |
594 | 0 | return meta->on_error2(front.id, meta->data, front.error_code, |
595 | 0 | front.error_text); |
596 | 0 | } |
597 | 0 | } else { |
598 | 0 | const bool contended = (*butex == meta->contended_ver()); |
599 | 0 | *butex = meta->first_ver; |
600 | 0 | meta->mutex.unlock(); |
601 | 0 | if (contended) { |
602 | | // We may wake up already-reused id, but that's OK. |
603 | 0 | bthread::butex_wake(butex); |
604 | 0 | } |
605 | 0 | return 0; |
606 | 0 | } |
607 | 0 | } |
608 | | |
609 | 0 | int bthread_id_unlock_and_destroy(bthread_id_t id) { |
610 | 0 | bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
611 | 0 | if (!meta) { |
612 | 0 | return EINVAL; |
613 | 0 | } |
614 | 0 | uint32_t* butex = meta->butex; |
615 | 0 | uint32_t* join_butex = meta->join_butex; |
616 | 0 | const uint32_t id_ver = bthread::get_version(id); |
617 | 0 | meta->mutex.lock(); |
618 | 0 | if (!meta->has_version(id_ver)) { |
619 | 0 | meta->mutex.unlock(); |
620 | 0 | LOG(FATAL) << "Invalid bthread_id=" << id.value; |
621 | 0 | return EINVAL; |
622 | 0 | } |
623 | 0 | if (*butex == meta->first_ver) { |
624 | 0 | meta->mutex.unlock(); |
625 | 0 | LOG(FATAL) << "bthread_id=" << id.value << " is not locked!"; |
626 | 0 | return EPERM; |
627 | 0 | } |
628 | 0 | const uint32_t next_ver = meta->end_ver(); |
629 | 0 | *butex = next_ver; |
630 | 0 | *join_butex = next_ver; |
631 | 0 | meta->first_ver = next_ver; |
632 | 0 | meta->locked_ver = next_ver; |
633 | 0 | meta->pending_q.clear(); |
634 | 0 | meta->mutex.unlock(); |
635 | | // Notice that butex_wake* returns # of woken-up, not successful or not. |
636 | 0 | bthread::butex_wake_except(butex, 0); |
637 | 0 | bthread::butex_wake_all(join_butex); |
638 | 0 | return_resource(bthread::get_slot(id)); |
639 | 0 | return 0; |
640 | 0 | } |
641 | | |
642 | | int bthread_id_list_init(bthread_id_list_t* list, |
643 | | unsigned /*size*/, |
644 | 0 | unsigned /*conflict_size*/) { |
645 | 0 | list->impl = NULL; // create on demand. |
646 | | // Set unused fields to zero as well. |
647 | 0 | list->head = 0; |
648 | 0 | list->size = 0; |
649 | 0 | list->conflict_head = 0; |
650 | 0 | list->conflict_size = 0; |
651 | 0 | return 0; |
652 | 0 | } |
653 | | |
654 | 0 | void bthread_id_list_destroy(bthread_id_list_t* list) { |
655 | 0 | delete static_cast<bthread::IdList*>(list->impl); |
656 | 0 | list->impl = NULL; |
657 | 0 | } |
658 | | |
659 | 0 | int bthread_id_list_add(bthread_id_list_t* list, bthread_id_t id) { |
660 | 0 | if (list->impl == NULL) { |
661 | 0 | list->impl = new (std::nothrow) bthread::IdList; |
662 | 0 | if (NULL == list->impl) { |
663 | 0 | return ENOMEM; |
664 | 0 | } |
665 | 0 | } |
666 | 0 | return static_cast<bthread::IdList*>(list->impl)->add(id); |
667 | 0 | } |
668 | | |
669 | 0 | int bthread_id_list_reset(bthread_id_list_t* list, int error_code) { |
670 | 0 | return bthread_id_list_reset2(list, error_code, std::string()); |
671 | 0 | } |
672 | | |
673 | | void bthread_id_list_swap(bthread_id_list_t* list1, |
674 | 0 | bthread_id_list_t* list2) { |
675 | 0 | std::swap(list1->impl, list2->impl); |
676 | 0 | } |
677 | | |
678 | | int bthread_id_list_reset_pthreadsafe(bthread_id_list_t* list, int error_code, |
679 | 0 | pthread_mutex_t* mutex) { |
680 | 0 | return bthread_id_list_reset2_pthreadsafe( |
681 | 0 | list, error_code, std::string(), mutex); |
682 | 0 | } |
683 | | |
684 | | int bthread_id_list_reset_bthreadsafe(bthread_id_list_t* list, int error_code, |
685 | 0 | bthread_mutex_t* mutex) { |
686 | 0 | return bthread_id_list_reset2_bthreadsafe( |
687 | 0 | list, error_code, std::string(), mutex); |
688 | 0 | } |
689 | | |
690 | | } // extern "C" |
691 | | |
692 | | int bthread_id_create2( |
693 | | bthread_id_t* id, void* data, |
694 | 0 | int (*on_error)(bthread_id_t, void*, int, const std::string&)) { |
695 | 0 | return bthread::id_create_impl( |
696 | 0 | id, data, NULL, |
697 | 0 | (on_error ? on_error : bthread::default_bthread_id_on_error2)); |
698 | 0 | } |
699 | | |
700 | | int bthread_id_create2_ranged( |
701 | | bthread_id_t* id, void* data, |
702 | | int (*on_error)(bthread_id_t, void*, int, const std::string&), |
703 | 0 | int range) { |
704 | 0 | return bthread::id_create_ranged_impl( |
705 | 0 | id, data, NULL, |
706 | 0 | (on_error ? on_error : bthread::default_bthread_id_on_error2), range); |
707 | 0 | } |
708 | | |
709 | | int bthread_id_error2_verbose(bthread_id_t id, int error_code, |
710 | | const std::string& error_text, |
711 | 0 | const char *location) { |
712 | 0 | bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
713 | 0 | if (!meta) { |
714 | 0 | return EINVAL; |
715 | 0 | } |
716 | 0 | const uint32_t id_ver = bthread::get_version(id); |
717 | 0 | uint32_t* butex = meta->butex; |
718 | 0 | meta->mutex.lock(); |
719 | 0 | if (!meta->has_version(id_ver)) { |
720 | 0 | meta->mutex.unlock(); |
721 | 0 | return EINVAL; |
722 | 0 | } |
723 | 0 | if (*butex == meta->first_ver) { |
724 | 0 | *butex = meta->locked_ver; |
725 | 0 | meta->lock_location = location; |
726 | 0 | meta->mutex.unlock(); |
727 | 0 | if (meta->on_error) { |
728 | 0 | return meta->on_error(id, meta->data, error_code); |
729 | 0 | } else { |
730 | 0 | return meta->on_error2(id, meta->data, error_code, error_text); |
731 | 0 | } |
732 | 0 | } else { |
733 | 0 | bthread::PendingError e; |
734 | 0 | e.id = id; |
735 | 0 | e.error_code = error_code; |
736 | 0 | e.error_text = error_text; |
737 | 0 | e.location = location; |
738 | 0 | meta->pending_q.push(e); |
739 | 0 | meta->mutex.unlock(); |
740 | 0 | return 0; |
741 | 0 | } |
742 | 0 | } |
743 | | |
744 | | int bthread_id_list_reset2(bthread_id_list_t* list, |
745 | | int error_code, |
746 | 0 | const std::string& error_text) { |
747 | 0 | if (list->impl != NULL) { |
748 | 0 | static_cast<bthread::IdList*>(list->impl)->apply( |
749 | 0 | bthread::IdResetter(error_code, error_text)); |
750 | 0 | } |
751 | 0 | return 0; |
752 | 0 | } |
753 | | |
754 | | int bthread_id_list_reset2_pthreadsafe(bthread_id_list_t* list, |
755 | | int error_code, |
756 | | const std::string& error_text, |
757 | 0 | pthread_mutex_t* mutex) { |
758 | 0 | if (mutex == NULL) { |
759 | 0 | return EINVAL; |
760 | 0 | } |
761 | 0 | if (list->impl == NULL) { |
762 | 0 | return 0; |
763 | 0 | } |
764 | 0 | bthread_id_list_t tmplist; |
765 | 0 | const int rc = bthread_id_list_init(&tmplist, 0, 0); |
766 | 0 | if (rc != 0) { |
767 | 0 | return rc; |
768 | 0 | } |
769 | | // Swap out the list then reset. The critical section is very small. |
770 | 0 | pthread_mutex_lock(mutex); |
771 | 0 | std::swap(list->impl, tmplist.impl); |
772 | 0 | pthread_mutex_unlock(mutex); |
773 | 0 | const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text); |
774 | 0 | bthread_id_list_destroy(&tmplist); |
775 | 0 | return rc2; |
776 | 0 | } |
777 | | |
778 | | int bthread_id_list_reset2_bthreadsafe(bthread_id_list_t* list, |
779 | | int error_code, |
780 | | const std::string& error_text, |
781 | 0 | bthread_mutex_t* mutex) { |
782 | 0 | if (mutex == NULL) { |
783 | 0 | return EINVAL; |
784 | 0 | } |
785 | 0 | if (list->impl == NULL) { |
786 | 0 | return 0; |
787 | 0 | } |
788 | 0 | bthread_id_list_t tmplist; |
789 | 0 | const int rc = bthread_id_list_init(&tmplist, 0, 0); |
790 | 0 | if (rc != 0) { |
791 | 0 | return rc; |
792 | 0 | } |
793 | | // Swap out the list then reset. The critical section is very small. |
794 | 0 | bthread_mutex_lock(mutex); |
795 | 0 | std::swap(list->impl, tmplist.impl); |
796 | 0 | bthread_mutex_unlock(mutex); |
797 | 0 | const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text); |
798 | 0 | bthread_id_list_destroy(&tmplist); |
799 | 0 | return rc2; |
800 | 0 | } |