Coverage Report

Created: 2026-01-09 06:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/bthread/fd.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Thu Aug  7 18:56:27 CST 2014
21
22
#include "butil/compat.h"
23
#include <new>                                   // std::nothrow
24
#include <sys/poll.h>                            // poll()
25
#if defined(OS_MACOSX)
26
#include <sys/types.h>                           // struct kevent
27
#include <sys/event.h>                           // kevent(), kqueue()
28
#endif
29
#include "butil/atomicops.h"
30
#include "butil/time.h"
31
#include "butil/fd_utility.h"                     // make_non_blocking
32
#include "butil/logging.h"
33
#include "butil/third_party/murmurhash3/murmurhash3.h"   // fmix32
34
#include "butil/memory/scope_guard.h"
35
#include "bthread/butex.h"                       // butex_*
36
#include "bthread/task_group.h"                  // TaskGroup
37
#include "bthread/bthread.h"                             // bthread_start_urgent
38
39
namespace butil {
40
extern int pthread_fd_wait(int fd, unsigned events, const timespec* abstime);
41
}
42
43
// Implement bthread functions on file descriptors
44
45
namespace bthread {
46
47
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
48
49
template <typename T, size_t NBLOCK, size_t BLOCK_SIZE>
50
class LazyArray {
51
    struct Block {
52
        butil::atomic<T> items[BLOCK_SIZE];
53
    };
54
55
public:
56
2
    LazyArray() {
57
2
        memset(static_cast<void*>(_blocks), 0, sizeof(butil::atomic<Block*>) * NBLOCK);
58
2
    }
59
60
0
    butil::atomic<T>* get_or_new(size_t index) {
61
0
        const size_t block_index = index / BLOCK_SIZE;
62
0
        if (block_index >= NBLOCK) {
63
0
            return NULL;
64
0
        }
65
0
        const size_t block_offset = index - block_index * BLOCK_SIZE;
66
0
        Block* b = _blocks[block_index].load(butil::memory_order_consume);
67
0
        if (b != NULL) {
68
0
            return b->items + block_offset;
69
0
        }
70
0
        b = new (std::nothrow) Block;
71
0
        if (NULL == b) {
72
0
            b = _blocks[block_index].load(butil::memory_order_consume);
73
0
            return (b ? b->items + block_offset : NULL);
74
0
        }
75
        // Set items to default value of T.
76
0
        std::fill(b->items, b->items + BLOCK_SIZE, T());
77
0
        Block* expected = NULL;
78
0
        if (_blocks[block_index].compare_exchange_strong(
79
0
                expected, b, butil::memory_order_release,
80
0
                butil::memory_order_consume)) {
81
0
            return b->items + block_offset;
82
0
        }
83
0
        delete b;
84
0
        return expected->items + block_offset;
85
0
    }
86
87
0
    butil::atomic<T>* get(size_t index) const {
88
0
        const size_t block_index = index / BLOCK_SIZE;
89
0
        if (__builtin_expect(block_index < NBLOCK, 1)) {
90
0
            const size_t block_offset = index - block_index * BLOCK_SIZE;
91
0
            Block* const b = _blocks[block_index].load(butil::memory_order_consume);
92
0
            if (__builtin_expect(b != NULL, 1)) {
93
0
                return b->items + block_offset;
94
0
            }
95
0
        }
96
0
        return NULL;
97
0
    }
98
99
private:
100
    butil::atomic<Block*> _blocks[NBLOCK];
101
};
102
103
typedef butil::atomic<int> EpollButex;
104
105
static EpollButex* const CLOSING_GUARD = (EpollButex*)(intptr_t)-1L;
106
107
#ifndef NDEBUG
108
butil::static_atomic<int> break_nums = BUTIL_STATIC_ATOMIC_INIT(0);
109
#endif
110
111
// Able to address 67108864 file descriptors, should be enough.
112
LazyArray<EpollButex*, 262144/*NBLOCK*/, 256/*BLOCK_SIZE*/> fd_butexes;
113
114
static const int BTHREAD_DEFAULT_EPOLL_SIZE = 65536;
115
116
class EpollThread {
117
public:
118
    EpollThread()
119
2
        : _epfd(-1)
120
2
        , _stop(false)
121
2
        , _tid(0) {
122
2
    }
123
124
0
    int start(int epoll_size) {
125
0
        if (started()) {
126
0
            return -1;
127
0
        }
128
0
        _start_mutex.lock();
129
        // Double check
130
0
        if (started()) {
131
0
            _start_mutex.unlock();
132
0
            return -1;
133
0
        }
134
0
#if defined(OS_LINUX)
135
0
        _epfd = epoll_create(epoll_size);
136
#elif defined(OS_MACOSX)
137
        _epfd = kqueue();
138
#endif
139
0
        _start_mutex.unlock();
140
0
        if (_epfd < 0) {
141
0
            PLOG(FATAL) << "Fail to epoll_create/kqueue";
142
0
            return -1;
143
0
        }
144
0
        bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
145
0
        bthread_attr_set_name(&attr, "EpollThread::run_this");
146
0
        if (bthread_start_background(
147
0
                &_tid, &attr, EpollThread::run_this, this) != 0) {
148
0
            close(_epfd);
149
0
            _epfd = -1;
150
0
            LOG(FATAL) << "Fail to create epoll bthread";
151
0
            return -1;
152
0
        }
153
0
        return 0;
154
0
    }
155
156
    // Note: This function does not wake up suspended fd_wait. This is fine
157
    // since stop_and_join is only called on program's termination
158
    // (g_task_control.stop()), suspended bthreads do not block quit of
159
    // worker pthreads and completion of g_task_control.stop().
160
0
    int stop_and_join() {
161
0
        if (!started()) {
162
0
            return 0;
163
0
        }
164
        // No matter what this function returns, _epfd will be set to -1
165
        // (making started() false) to avoid latter stop_and_join() to
166
        // enter again.
167
0
        const int saved_epfd = _epfd;
168
0
        _epfd = -1;
169
170
        // epoll_wait cannot be woken up by closing _epfd. We wake up
171
        // epoll_wait by inserting a fd continuously triggering EPOLLOUT.
172
        // Visibility of _stop: constant EPOLLOUT forces epoll_wait to see
173
        // _stop (to be true) finally.
174
0
        _stop = true;
175
0
        int closing_epoll_pipe[2];
176
0
        if (pipe(closing_epoll_pipe)) {
177
0
            PLOG(FATAL) << "Fail to create closing_epoll_pipe";
178
0
            return -1;
179
0
        }
180
0
#if defined(OS_LINUX)
181
0
        epoll_event evt = { EPOLLOUT, { NULL } };
182
0
        if (epoll_ctl(saved_epfd, EPOLL_CTL_ADD,
183
0
                      closing_epoll_pipe[1], &evt) < 0) {
184
#elif defined(OS_MACOSX)
185
        struct kevent kqueue_event;
186
        EV_SET(&kqueue_event, closing_epoll_pipe[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
187
                0, 0, NULL);
188
        if (kevent(saved_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
189
#endif
190
0
            PLOG(FATAL) << "Fail to add closing_epoll_pipe into epfd="
191
0
                        << saved_epfd;
192
0
            return -1;
193
0
        }
194
195
0
        const int rc = bthread_join(_tid, NULL);
196
0
        if (rc) {
197
0
            LOG(FATAL) << "Fail to join EpollThread, " << berror(rc);
198
0
            return -1;
199
0
        }
200
0
        close(closing_epoll_pipe[0]);
201
0
        close(closing_epoll_pipe[1]);
202
0
        close(saved_epfd);
203
0
        return 0;
204
0
    }
205
206
0
    int fd_wait(int fd, unsigned events, const timespec* abstime) {
207
0
        butil::atomic<EpollButex*>* p = fd_butexes.get_or_new(fd);
208
0
        if (NULL == p) {
209
0
            errno = ENOMEM;
210
0
            return -1;
211
0
        }
212
213
0
        EpollButex* butex = p->load(butil::memory_order_consume);
214
0
        if (NULL == butex) {
215
            // It is rare to wait on one file descriptor from multiple threads
216
            // simultaneously. Creating singleton by optimistic locking here
217
            // saves mutexes for each butex.
218
0
            butex = butex_create_checked<EpollButex>();
219
0
            butex->store(0, butil::memory_order_relaxed);
220
0
            EpollButex* expected = NULL;
221
0
            if (!p->compare_exchange_strong(expected, butex,
222
0
                                            butil::memory_order_release,
223
0
                                            butil::memory_order_consume)) {
224
0
                butex_destroy(butex);
225
0
                butex = expected;
226
0
            }
227
0
        }
228
        
229
0
        while (butex == CLOSING_GUARD) {  // bthread_close() is running.
230
0
            if (sched_yield() < 0) {
231
0
                return -1;
232
0
            }
233
0
            butex = p->load(butil::memory_order_consume);
234
0
        }
235
        // Save value of butex before adding to epoll because the butex may
236
        // be changed before butex_wait. No memory fence because EPOLL_CTL_MOD
237
        // and EPOLL_CTL_ADD shall have release fence.
238
0
        const int expected_val = butex->load(butil::memory_order_relaxed);
239
240
0
#if defined(OS_LINUX)
241
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
242
        epoll_event evt = { events | EPOLLONESHOT, { butex } };
243
        if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
244
            if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
245
                    errno != EEXIST) {
246
                PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
247
                return -1;
248
            }
249
        }
250
# else
251
0
        epoll_event evt;
252
0
        evt.events = events;
253
0
        evt.data.fd = fd;
254
0
        if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
255
0
            errno != EEXIST) {
256
0
            PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
257
0
            return -1;
258
0
        }
259
0
# endif
260
#elif defined(OS_MACOSX)
261
        struct kevent kqueue_event;
262
        EV_SET(&kqueue_event, fd, events, EV_ADD | EV_ENABLE | EV_ONESHOT,
263
                0, 0, butex);
264
        if (kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
265
            PLOG(FATAL) << "Fail to add fd=" << fd << " into kqueuefd=" << _epfd;
266
            return -1;
267
        }
268
#endif
269
0
        while (butex->load(butil::memory_order_relaxed) == expected_val) {
270
0
            if (butex_wait(butex, expected_val, abstime) < 0 &&
271
0
                errno != EWOULDBLOCK && errno != EINTR) {
272
0
                return -1;
273
0
            }
274
0
        }
275
0
        return 0;
276
0
    }
277
278
0
    int fd_close(int fd) {
279
0
        if (fd < 0) {
280
            // what close(-1) returns
281
0
            errno = EBADF;
282
0
            return -1;
283
0
        }
284
0
        butil::atomic<EpollButex*>* pbutex = bthread::fd_butexes.get(fd);
285
0
        if (NULL == pbutex) {
286
            // Did not call bthread_fd functions, close directly.
287
0
            return close(fd);
288
0
        }
289
0
        EpollButex* butex = pbutex->exchange(
290
0
            CLOSING_GUARD, butil::memory_order_relaxed);
291
0
        if (butex == CLOSING_GUARD) {
292
            // concurrent double close detected.
293
0
            errno = EBADF;
294
0
            return -1;
295
0
        }
296
0
        if (butex != NULL) {
297
0
            butex->fetch_add(1, butil::memory_order_relaxed);
298
0
            butex_wake_all(butex);
299
0
        }
300
0
#if defined(OS_LINUX)
301
0
        epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
302
#elif defined(OS_MACOSX)
303
        struct kevent evt;
304
        EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
305
        kevent(_epfd, &evt, 1, NULL, 0, NULL);
306
        EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
307
        kevent(_epfd, &evt, 1, NULL, 0, NULL);
308
#endif
309
0
        const int rc = close(fd);
310
0
        pbutex->exchange(butex, butil::memory_order_relaxed);
311
0
        return rc;
312
0
    }
313
314
0
    bool started() const {
315
0
        return _epfd >= 0;
316
0
    }
317
318
private:
319
0
    static void* run_this(void* arg) {
320
0
        return static_cast<EpollThread*>(arg)->run();
321
0
    }
322
323
0
    void* run() {
324
0
        const int initial_epfd = _epfd;
325
0
        const size_t MAX_EVENTS = 32;
326
0
#if defined(OS_LINUX)
327
0
        epoll_event* e = new (std::nothrow) epoll_event[MAX_EVENTS];
328
#elif defined(OS_MACOSX)
329
        typedef struct kevent KEVENT;
330
        struct kevent* e = new (std::nothrow) KEVENT[MAX_EVENTS];
331
#endif
332
0
        if (NULL == e) {
333
0
            LOG(FATAL) << "Fail to new epoll_event";
334
0
            return NULL;
335
0
        }
336
337
0
#if defined(OS_LINUX)
338
0
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
339
0
        DLOG(INFO) << "Use DEL+ADD instead of EPOLLONESHOT+MOD due to kernel bug. Performance will be much lower.";
340
0
# endif
341
0
#endif
342
0
        while (!_stop) {
343
0
            const int epfd = _epfd;
344
0
#if defined(OS_LINUX)
345
0
            const int n = epoll_wait(epfd, e, MAX_EVENTS, -1);
346
#elif defined(OS_MACOSX)
347
            const int n = kevent(epfd, NULL, 0, e, MAX_EVENTS, NULL);
348
#endif
349
0
            if (_stop) {
350
0
                break;
351
0
            }
352
353
0
            if (n < 0) {
354
0
                if (errno == EINTR) {
355
0
#ifndef NDEBUG
356
0
                    break_nums.fetch_add(1, butil::memory_order_relaxed);
357
0
                    int* p = &errno;
358
0
                    const char* b = berror();
359
0
                    const char* b2 = berror(errno);
360
0
                    DLOG(FATAL) << "Fail to epoll epfd=" << epfd << ", "
361
0
                                << errno << " " << p << " " <<  b << " " <<  b2;
362
0
#endif
363
0
                    continue;
364
0
                }
365
366
0
                PLOG(INFO) << "Fail to epoll epfd=" << epfd;
367
0
                break;
368
0
            }
369
370
0
#if defined(OS_LINUX)
371
0
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
372
0
            for (int i = 0; i < n; ++i) {
373
0
                epoll_ctl(epfd, EPOLL_CTL_DEL, e[i].data.fd, NULL);
374
0
            }
375
0
# endif
376
0
#endif
377
0
            for (int i = 0; i < n; ++i) {
378
0
#if defined(OS_LINUX)
379
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
380
                EpollButex* butex = static_cast<EpollButex*>(e[i].data.ptr);
381
# else
382
0
                butil::atomic<EpollButex*>* pbutex = fd_butexes.get(e[i].data.fd);
383
0
                EpollButex* butex = pbutex ?
384
0
                    pbutex->load(butil::memory_order_consume) : NULL;
385
0
# endif
386
#elif defined(OS_MACOSX)
387
                EpollButex* butex = static_cast<EpollButex*>(e[i].udata);
388
#endif
389
0
                if (butex != NULL && butex != CLOSING_GUARD) {
390
0
                    butex->fetch_add(1, butil::memory_order_relaxed);
391
0
                    butex_wake_all(butex);
392
0
                }
393
0
            }
394
0
        }
395
396
0
        delete [] e;
397
0
        DLOG(INFO) << "EpollThread=" << _tid << "(epfd="
398
0
                   << initial_epfd << ") is about to stop";
399
0
        return NULL;
400
0
    }
401
402
    int _epfd;
403
    bool _stop;
404
    bthread_t _tid;
405
    butil::Mutex _start_mutex;
406
};
407
408
EpollThread epoll_thread[BTHREAD_EPOLL_THREAD_NUM];
409
410
0
static inline EpollThread& get_epoll_thread(int fd) {
411
0
    if (BTHREAD_EPOLL_THREAD_NUM == 1UL) {
412
0
        EpollThread& et = epoll_thread[0];
413
0
        et.start(BTHREAD_DEFAULT_EPOLL_SIZE);
414
0
        return et;
415
0
    }
416
417
0
    EpollThread& et = epoll_thread[butil::fmix32(fd) % BTHREAD_EPOLL_THREAD_NUM];
418
0
    et.start(BTHREAD_DEFAULT_EPOLL_SIZE);
419
0
    return et;
420
0
}
421
422
//TODO(zhujiashun): change name
423
0
int stop_and_join_epoll_threads() {
424
    // Returns -1 if any epoll thread failed to stop.
425
0
    int rc = 0;
426
0
    for (size_t i = 0; i < BTHREAD_EPOLL_THREAD_NUM; ++i) {
427
0
        if (epoll_thread[i].stop_and_join() < 0) {
428
0
            rc = -1;
429
0
        }
430
0
    }
431
0
    return rc;
432
0
}
433
434
// For pthreads.
435
int pthread_fd_wait(int fd, unsigned events,
436
0
                    const timespec* abstime) {
437
0
    return butil::pthread_fd_wait(fd, events, abstime);
438
0
}
439
440
}  // namespace bthread
441
442
extern "C" {
443
444
0
int bthread_fd_wait(int fd, unsigned events) {
445
0
    if (fd < 0) {
446
0
        errno = EINVAL;
447
0
        return -1;
448
0
    }
449
0
    bthread::TaskGroup* g = bthread::tls_task_group;
450
0
    if (NULL != g && !g->is_current_pthread_task()) {
451
0
        return bthread::get_epoll_thread(fd).fd_wait(
452
0
            fd, events, NULL);
453
0
    }
454
0
    return bthread::pthread_fd_wait(fd, events, NULL);
455
0
}
456
457
int bthread_fd_timedwait(int fd, unsigned events,
458
0
                         const timespec* abstime) {
459
0
    if (NULL == abstime) {
460
0
        return bthread_fd_wait(fd, events);
461
0
    }
462
0
    if (fd < 0) {
463
0
        errno = EINVAL;
464
0
        return -1;
465
0
    }
466
0
    bthread::TaskGroup* g = bthread::tls_task_group;
467
0
    if (NULL != g && !g->is_current_pthread_task()) {
468
0
        return bthread::get_epoll_thread(fd).fd_wait(
469
0
            fd, events, abstime);
470
0
    }
471
0
    return bthread::pthread_fd_wait(fd, events, abstime);
472
0
}
473
474
int bthread_connect(int sockfd, const sockaddr* serv_addr,
475
                    socklen_t addrlen) {
476
    bthread::TaskGroup* g = bthread::tls_task_group;
477
    if (NULL == g || g->is_current_pthread_task()) {
478
        return ::connect(sockfd, serv_addr, addrlen);
479
    }
480
481
    bool is_blocking = butil::is_blocking(sockfd);
482
    if (is_blocking) {
483
        butil::make_non_blocking(sockfd);
484
    }
485
    // Scoped non-blocking.
486
0
    auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
487
0
        if (is_blocking) {
488
0
            butil::make_blocking(sockfd);
489
0
        }
490
0
    });
491
492
    const int rc = ::connect(sockfd, serv_addr, addrlen);
493
    if (rc == 0 || errno != EINPROGRESS) {
494
        return rc;
495
    }
496
#if defined(OS_LINUX)
497
    if (bthread_fd_wait(sockfd, EPOLLOUT) < 0) {
498
#elif defined(OS_MACOSX)
499
    if (bthread_fd_wait(sockfd, EVFILT_WRITE) < 0) {
500
#endif
501
        return -1;
502
    }
503
504
    if (butil::is_connected(sockfd) != 0) {
505
        return -1;
506
    }
507
508
    return 0;
509
}
510
511
int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
512
0
                          socklen_t addrlen, const timespec* abstime) {
513
0
    if (!abstime) {
514
0
        return bthread_connect(sockfd, serv_addr, addrlen);
515
0
    }
516
517
0
    bool is_blocking = butil::is_blocking(sockfd);
518
0
    if (is_blocking) {
519
0
        butil::make_non_blocking(sockfd);
520
0
    }
521
    // Scoped non-blocking.
522
0
    auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
523
0
        if (is_blocking) {
524
0
            butil::make_blocking(sockfd);
525
0
        }
526
0
    });
527
528
0
    const int rc = ::connect(sockfd, serv_addr, addrlen);
529
0
    if (rc == 0 || errno != EINPROGRESS) {
530
0
        return rc;
531
0
    }
532
0
#if defined(OS_LINUX)
533
0
    if (bthread_fd_timedwait(sockfd, EPOLLOUT, abstime) < 0) {
534
#elif defined(OS_MACOSX)
535
    if (bthread_fd_timedwait(sockfd, EVFILT_WRITE, abstime) < 0) {
536
#endif
537
0
        return -1;
538
0
    }
539
540
0
    if (butil::is_connected(sockfd) != 0) {
541
0
        return -1;
542
0
    }
543
544
0
    return 0;
545
0
}
546
547
// This does not wake pthreads calling bthread_fd_*wait.
548
0
int bthread_close(int fd) {
549
0
    return bthread::get_epoll_thread(fd).fd_close(fd);
550
0
}
551
552
}  // extern "C"