Coverage Report

Created: 2025-08-05 06:45

/src/brpc/src/bthread/fd.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// bthread - An M:N threading library to make applications more concurrent.
19
20
// Date: Thu Aug  7 18:56:27 CST 2014
21
22
#include "butil/compat.h"
23
#include <new>                                   // std::nothrow
24
#include <sys/poll.h>                            // poll()
25
#if defined(OS_MACOSX)
26
#include <sys/types.h>                           // struct kevent
27
#include <sys/event.h>                           // kevent(), kqueue()
28
#endif
29
#include "butil/atomicops.h"
30
#include "butil/time.h"
31
#include "butil/fd_utility.h"                     // make_non_blocking
32
#include "butil/logging.h"
33
#include "butil/third_party/murmurhash3/murmurhash3.h"   // fmix32
34
#include "butil/memory/scope_guard.h"
35
#include "bthread/butex.h"                       // butex_*
36
#include "bthread/task_group.h"                  // TaskGroup
37
#include "bthread/bthread.h"                             // bthread_start_urgent
38
39
namespace butil {
40
extern int pthread_fd_wait(int fd, unsigned events, const timespec* abstime);
41
}
42
43
// Implement bthread functions on file descriptors
44
45
namespace bthread {
46
47
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
48
49
template <typename T, size_t NBLOCK, size_t BLOCK_SIZE>
50
class LazyArray {
51
    struct Block {
52
        butil::atomic<T> items[BLOCK_SIZE];
53
    };
54
55
public:
56
2
    LazyArray() {
57
2
        memset(static_cast<void*>(_blocks), 0, sizeof(butil::atomic<Block*>) * NBLOCK);
58
2
    }
59
60
0
    butil::atomic<T>* get_or_new(size_t index) {
61
0
        const size_t block_index = index / BLOCK_SIZE;
62
0
        if (block_index >= NBLOCK) {
63
0
            return NULL;
64
0
        }
65
0
        const size_t block_offset = index - block_index * BLOCK_SIZE;
66
0
        Block* b = _blocks[block_index].load(butil::memory_order_consume);
67
0
        if (b != NULL) {
68
0
            return b->items + block_offset;
69
0
        }
70
0
        b = new (std::nothrow) Block;
71
0
        if (NULL == b) {
72
0
            b = _blocks[block_index].load(butil::memory_order_consume);
73
0
            return (b ? b->items + block_offset : NULL);
74
0
        }
75
        // Set items to default value of T.
76
0
        std::fill(b->items, b->items + BLOCK_SIZE, T());
77
0
        Block* expected = NULL;
78
0
        if (_blocks[block_index].compare_exchange_strong(
79
0
                expected, b, butil::memory_order_release,
80
0
                butil::memory_order_consume)) {
81
0
            return b->items + block_offset;
82
0
        }
83
0
        delete b;
84
0
        return expected->items + block_offset;
85
0
    }
86
87
0
    butil::atomic<T>* get(size_t index) const {
88
0
        const size_t block_index = index / BLOCK_SIZE;
89
0
        if (__builtin_expect(block_index < NBLOCK, 1)) {
90
0
            const size_t block_offset = index - block_index * BLOCK_SIZE;
91
0
            Block* const b = _blocks[block_index].load(butil::memory_order_consume);
92
0
            if (__builtin_expect(b != NULL, 1)) {
93
0
                return b->items + block_offset;
94
0
            }
95
0
        }
96
0
        return NULL;
97
0
    }
98
99
private:
100
    butil::atomic<Block*> _blocks[NBLOCK];
101
};
102
103
typedef butil::atomic<int> EpollButex;
104
105
static EpollButex* const CLOSING_GUARD = (EpollButex*)(intptr_t)-1L;
106
107
#ifndef NDEBUG
108
butil::static_atomic<int> break_nums = BUTIL_STATIC_ATOMIC_INIT(0);
109
#endif
110
111
// Able to address 67108864 file descriptors, should be enough.
112
LazyArray<EpollButex*, 262144/*NBLOCK*/, 256/*BLOCK_SIZE*/> fd_butexes;
113
114
static const int BTHREAD_DEFAULT_EPOLL_SIZE = 65536;
115
116
class EpollThread {
117
public:
118
    EpollThread()
119
2
        : _epfd(-1)
120
2
        , _stop(false)
121
2
        , _tid(0) {
122
2
    }
123
124
0
    int start(int epoll_size) {
125
0
        if (started()) {
126
0
            return -1;
127
0
        }
128
0
        _start_mutex.lock();
129
        // Double check
130
0
        if (started()) {
131
0
            _start_mutex.unlock();
132
0
            return -1;
133
0
        }
134
0
#if defined(OS_LINUX)
135
0
        _epfd = epoll_create(epoll_size);
136
#elif defined(OS_MACOSX)
137
        _epfd = kqueue();
138
#endif
139
0
        _start_mutex.unlock();
140
0
        if (_epfd < 0) {
141
0
            PLOG(FATAL) << "Fail to epoll_create/kqueue";
142
0
            return -1;
143
0
        }
144
0
        if (bthread_start_background(
145
0
                &_tid, NULL, EpollThread::run_this, this) != 0) {
146
0
            close(_epfd);
147
0
            _epfd = -1;
148
0
            LOG(FATAL) << "Fail to create epoll bthread";
149
0
            return -1;
150
0
        }
151
0
        return 0;
152
0
    }
153
154
    // Note: This function does not wake up suspended fd_wait. This is fine
155
    // since stop_and_join is only called on program's termination
156
    // (g_task_control.stop()), suspended bthreads do not block quit of
157
    // worker pthreads and completion of g_task_control.stop().
158
0
    int stop_and_join() {
159
0
        if (!started()) {
160
0
            return 0;
161
0
        }
162
        // No matter what this function returns, _epfd will be set to -1
163
        // (making started() false) to avoid latter stop_and_join() to
164
        // enter again.
165
0
        const int saved_epfd = _epfd;
166
0
        _epfd = -1;
167
168
        // epoll_wait cannot be woken up by closing _epfd. We wake up
169
        // epoll_wait by inserting a fd continuously triggering EPOLLOUT.
170
        // Visibility of _stop: constant EPOLLOUT forces epoll_wait to see
171
        // _stop (to be true) finally.
172
0
        _stop = true;
173
0
        int closing_epoll_pipe[2];
174
0
        if (pipe(closing_epoll_pipe)) {
175
0
            PLOG(FATAL) << "Fail to create closing_epoll_pipe";
176
0
            return -1;
177
0
        }
178
0
#if defined(OS_LINUX)
179
0
        epoll_event evt = { EPOLLOUT, { NULL } };
180
0
        if (epoll_ctl(saved_epfd, EPOLL_CTL_ADD,
181
0
                      closing_epoll_pipe[1], &evt) < 0) {
182
#elif defined(OS_MACOSX)
183
        struct kevent kqueue_event;
184
        EV_SET(&kqueue_event, closing_epoll_pipe[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
185
                0, 0, NULL);
186
        if (kevent(saved_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
187
#endif
188
0
            PLOG(FATAL) << "Fail to add closing_epoll_pipe into epfd="
189
0
                        << saved_epfd;
190
0
            return -1;
191
0
        }
192
193
0
        const int rc = bthread_join(_tid, NULL);
194
0
        if (rc) {
195
0
            LOG(FATAL) << "Fail to join EpollThread, " << berror(rc);
196
0
            return -1;
197
0
        }
198
0
        close(closing_epoll_pipe[0]);
199
0
        close(closing_epoll_pipe[1]);
200
0
        close(saved_epfd);
201
0
        return 0;
202
0
    }
203
204
0
    int fd_wait(int fd, unsigned events, const timespec* abstime) {
205
0
        butil::atomic<EpollButex*>* p = fd_butexes.get_or_new(fd);
206
0
        if (NULL == p) {
207
0
            errno = ENOMEM;
208
0
            return -1;
209
0
        }
210
211
0
        EpollButex* butex = p->load(butil::memory_order_consume);
212
0
        if (NULL == butex) {
213
            // It is rare to wait on one file descriptor from multiple threads
214
            // simultaneously. Creating singleton by optimistic locking here
215
            // saves mutexes for each butex.
216
0
            butex = butex_create_checked<EpollButex>();
217
0
            butex->store(0, butil::memory_order_relaxed);
218
0
            EpollButex* expected = NULL;
219
0
            if (!p->compare_exchange_strong(expected, butex,
220
0
                                            butil::memory_order_release,
221
0
                                            butil::memory_order_consume)) {
222
0
                butex_destroy(butex);
223
0
                butex = expected;
224
0
            }
225
0
        }
226
        
227
0
        while (butex == CLOSING_GUARD) {  // bthread_close() is running.
228
0
            if (sched_yield() < 0) {
229
0
                return -1;
230
0
            }
231
0
            butex = p->load(butil::memory_order_consume);
232
0
        }
233
        // Save value of butex before adding to epoll because the butex may
234
        // be changed before butex_wait. No memory fence because EPOLL_CTL_MOD
235
        // and EPOLL_CTL_ADD shall have release fence.
236
0
        const int expected_val = butex->load(butil::memory_order_relaxed);
237
238
0
#if defined(OS_LINUX)
239
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
240
        epoll_event evt = { events | EPOLLONESHOT, { butex } };
241
        if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
242
            if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
243
                    errno != EEXIST) {
244
                PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
245
                return -1;
246
            }
247
        }
248
# else
249
0
        epoll_event evt;
250
0
        evt.events = events;
251
0
        evt.data.fd = fd;
252
0
        if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
253
0
            errno != EEXIST) {
254
0
            PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
255
0
            return -1;
256
0
        }
257
0
# endif
258
#elif defined(OS_MACOSX)
259
        struct kevent kqueue_event;
260
        EV_SET(&kqueue_event, fd, events, EV_ADD | EV_ENABLE | EV_ONESHOT,
261
                0, 0, butex);
262
        if (kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
263
            PLOG(FATAL) << "Fail to add fd=" << fd << " into kqueuefd=" << _epfd;
264
            return -1;
265
        }
266
#endif
267
0
        while (butex->load(butil::memory_order_relaxed) == expected_val) {
268
0
            if (butex_wait(butex, expected_val, abstime) < 0 &&
269
0
                errno != EWOULDBLOCK && errno != EINTR) {
270
0
                return -1;
271
0
            }
272
0
        }
273
0
        return 0;
274
0
    }
275
276
0
    int fd_close(int fd) {
277
0
        if (fd < 0) {
278
            // what close(-1) returns
279
0
            errno = EBADF;
280
0
            return -1;
281
0
        }
282
0
        butil::atomic<EpollButex*>* pbutex = bthread::fd_butexes.get(fd);
283
0
        if (NULL == pbutex) {
284
            // Did not call bthread_fd functions, close directly.
285
0
            return close(fd);
286
0
        }
287
0
        EpollButex* butex = pbutex->exchange(
288
0
            CLOSING_GUARD, butil::memory_order_relaxed);
289
0
        if (butex == CLOSING_GUARD) {
290
            // concurrent double close detected.
291
0
            errno = EBADF;
292
0
            return -1;
293
0
        }
294
0
        if (butex != NULL) {
295
0
            butex->fetch_add(1, butil::memory_order_relaxed);
296
0
            butex_wake_all(butex);
297
0
        }
298
0
#if defined(OS_LINUX)
299
0
        epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
300
#elif defined(OS_MACOSX)
301
        struct kevent evt;
302
        EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
303
        kevent(_epfd, &evt, 1, NULL, 0, NULL);
304
        EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
305
        kevent(_epfd, &evt, 1, NULL, 0, NULL);
306
#endif
307
0
        const int rc = close(fd);
308
0
        pbutex->exchange(butex, butil::memory_order_relaxed);
309
0
        return rc;
310
0
    }
311
312
0
    bool started() const {
313
0
        return _epfd >= 0;
314
0
    }
315
316
private:
317
0
    static void* run_this(void* arg) {
318
0
        return static_cast<EpollThread*>(arg)->run();
319
0
    }
320
321
0
    void* run() {
322
0
        const int initial_epfd = _epfd;
323
0
        const size_t MAX_EVENTS = 32;
324
0
#if defined(OS_LINUX)
325
0
        epoll_event* e = new (std::nothrow) epoll_event[MAX_EVENTS];
326
#elif defined(OS_MACOSX)
327
        typedef struct kevent KEVENT;
328
        struct kevent* e = new (std::nothrow) KEVENT[MAX_EVENTS];
329
#endif
330
0
        if (NULL == e) {
331
0
            LOG(FATAL) << "Fail to new epoll_event";
332
0
            return NULL;
333
0
        }
334
335
0
#if defined(OS_LINUX)
336
0
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
337
0
        DLOG(INFO) << "Use DEL+ADD instead of EPOLLONESHOT+MOD due to kernel bug. Performance will be much lower.";
338
0
# endif
339
0
#endif
340
0
        while (!_stop) {
341
0
            const int epfd = _epfd;
342
0
#if defined(OS_LINUX)
343
0
            const int n = epoll_wait(epfd, e, MAX_EVENTS, -1);
344
#elif defined(OS_MACOSX)
345
            const int n = kevent(epfd, NULL, 0, e, MAX_EVENTS, NULL);
346
#endif
347
0
            if (_stop) {
348
0
                break;
349
0
            }
350
351
0
            if (n < 0) {
352
0
                if (errno == EINTR) {
353
0
#ifndef NDEBUG
354
0
                    break_nums.fetch_add(1, butil::memory_order_relaxed);
355
0
                    int* p = &errno;
356
0
                    const char* b = berror();
357
0
                    const char* b2 = berror(errno);
358
0
                    DLOG(FATAL) << "Fail to epoll epfd=" << epfd << ", "
359
0
                                << errno << " " << p << " " <<  b << " " <<  b2;
360
0
#endif
361
0
                    continue;
362
0
                }
363
364
0
                PLOG(INFO) << "Fail to epoll epfd=" << epfd;
365
0
                break;
366
0
            }
367
368
0
#if defined(OS_LINUX)
369
0
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
370
0
            for (int i = 0; i < n; ++i) {
371
0
                epoll_ctl(epfd, EPOLL_CTL_DEL, e[i].data.fd, NULL);
372
0
            }
373
0
# endif
374
0
#endif
375
0
            for (int i = 0; i < n; ++i) {
376
0
#if defined(OS_LINUX)
377
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
378
                EpollButex* butex = static_cast<EpollButex*>(e[i].data.ptr);
379
# else
380
0
                butil::atomic<EpollButex*>* pbutex = fd_butexes.get(e[i].data.fd);
381
0
                EpollButex* butex = pbutex ?
382
0
                    pbutex->load(butil::memory_order_consume) : NULL;
383
0
# endif
384
#elif defined(OS_MACOSX)
385
                EpollButex* butex = static_cast<EpollButex*>(e[i].udata);
386
#endif
387
0
                if (butex != NULL && butex != CLOSING_GUARD) {
388
0
                    butex->fetch_add(1, butil::memory_order_relaxed);
389
0
                    butex_wake_all(butex);
390
0
                }
391
0
            }
392
0
        }
393
394
0
        delete [] e;
395
0
        DLOG(INFO) << "EpollThread=" << _tid << "(epfd="
396
0
                   << initial_epfd << ") is about to stop";
397
0
        return NULL;
398
0
    }
399
400
    int _epfd;
401
    bool _stop;
402
    bthread_t _tid;
403
    butil::Mutex _start_mutex;
404
};
405
406
EpollThread epoll_thread[BTHREAD_EPOLL_THREAD_NUM];
407
408
0
static inline EpollThread& get_epoll_thread(int fd) {
409
0
    if (BTHREAD_EPOLL_THREAD_NUM == 1UL) {
410
0
        EpollThread& et = epoll_thread[0];
411
0
        et.start(BTHREAD_DEFAULT_EPOLL_SIZE);
412
0
        return et;
413
0
    }
414
415
0
    EpollThread& et = epoll_thread[butil::fmix32(fd) % BTHREAD_EPOLL_THREAD_NUM];
416
0
    et.start(BTHREAD_DEFAULT_EPOLL_SIZE);
417
0
    return et;
418
0
}
419
420
//TODO(zhujiashun): change name
421
0
int stop_and_join_epoll_threads() {
422
    // Returns -1 if any epoll thread failed to stop.
423
0
    int rc = 0;
424
0
    for (size_t i = 0; i < BTHREAD_EPOLL_THREAD_NUM; ++i) {
425
0
        if (epoll_thread[i].stop_and_join() < 0) {
426
0
            rc = -1;
427
0
        }
428
0
    }
429
0
    return rc;
430
0
}
431
432
// For pthreads.
433
int pthread_fd_wait(int fd, unsigned events,
434
0
                    const timespec* abstime) {
435
0
    return butil::pthread_fd_wait(fd, events, abstime);
436
0
}
437
438
}  // namespace bthread
439
440
extern "C" {
441
442
0
int bthread_fd_wait(int fd, unsigned events) {
443
0
    if (fd < 0) {
444
0
        errno = EINVAL;
445
0
        return -1;
446
0
    }
447
0
    bthread::TaskGroup* g = bthread::tls_task_group;
448
0
    if (NULL != g && !g->is_current_pthread_task()) {
449
0
        return bthread::get_epoll_thread(fd).fd_wait(
450
0
            fd, events, NULL);
451
0
    }
452
0
    return bthread::pthread_fd_wait(fd, events, NULL);
453
0
}
454
455
int bthread_fd_timedwait(int fd, unsigned events,
456
0
                         const timespec* abstime) {
457
0
    if (NULL == abstime) {
458
0
        return bthread_fd_wait(fd, events);
459
0
    }
460
0
    if (fd < 0) {
461
0
        errno = EINVAL;
462
0
        return -1;
463
0
    }
464
0
    bthread::TaskGroup* g = bthread::tls_task_group;
465
0
    if (NULL != g && !g->is_current_pthread_task()) {
466
0
        return bthread::get_epoll_thread(fd).fd_wait(
467
0
            fd, events, abstime);
468
0
    }
469
0
    return bthread::pthread_fd_wait(fd, events, abstime);
470
0
}
471
472
int bthread_connect(int sockfd, const sockaddr* serv_addr,
473
                    socklen_t addrlen) {
474
    bthread::TaskGroup* g = bthread::tls_task_group;
475
    if (NULL == g || g->is_current_pthread_task()) {
476
        return ::connect(sockfd, serv_addr, addrlen);
477
    }
478
479
    bool is_blocking = butil::is_blocking(sockfd);
480
    if (is_blocking) {
481
        butil::make_non_blocking(sockfd);
482
    }
483
    // Scoped non-blocking.
484
0
    auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
485
0
        if (is_blocking) {
486
0
            butil::make_blocking(sockfd);
487
0
        }
488
0
    });
489
490
    const int rc = ::connect(sockfd, serv_addr, addrlen);
491
    if (rc == 0 || errno != EINPROGRESS) {
492
        return rc;
493
    }
494
#if defined(OS_LINUX)
495
    if (bthread_fd_wait(sockfd, EPOLLOUT) < 0) {
496
#elif defined(OS_MACOSX)
497
    if (bthread_fd_wait(sockfd, EVFILT_WRITE) < 0) {
498
#endif
499
        return -1;
500
    }
501
502
    if (butil::is_connected(sockfd) != 0) {
503
        return -1;
504
    }
505
506
    return 0;
507
}
508
509
int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
510
0
                          socklen_t addrlen, const timespec* abstime) {
511
0
    if (!abstime) {
512
0
        return bthread_connect(sockfd, serv_addr, addrlen);
513
0
    }
514
515
0
    bool is_blocking = butil::is_blocking(sockfd);
516
0
    if (is_blocking) {
517
0
        butil::make_non_blocking(sockfd);
518
0
    }
519
    // Scoped non-blocking.
520
0
    auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
521
0
        if (is_blocking) {
522
0
            butil::make_blocking(sockfd);
523
0
        }
524
0
    });
525
526
0
    const int rc = ::connect(sockfd, serv_addr, addrlen);
527
0
    if (rc == 0 || errno != EINPROGRESS) {
528
0
        return rc;
529
0
    }
530
0
#if defined(OS_LINUX)
531
0
    if (bthread_fd_timedwait(sockfd, EPOLLOUT, abstime) < 0) {
532
#elif defined(OS_MACOSX)
533
    if (bthread_fd_timedwait(sockfd, EVFILT_WRITE, abstime) < 0) {
534
#endif
535
0
        return -1;
536
0
    }
537
538
0
    if (butil::is_connected(sockfd) != 0) {
539
0
        return -1;
540
0
    }
541
542
0
    return 0;
543
0
}
544
545
// This does not wake pthreads calling bthread_fd_*wait.
546
0
int bthread_close(int fd) {
547
0
    return bthread::get_epoll_thread(fd).fd_close(fd);
548
0
}
549
550
}  // extern "C"