/src/brpc/src/bthread/fd.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // bthread - An M:N threading library to make applications more concurrent. |
19 | | |
20 | | // Date: Thu Aug 7 18:56:27 CST 2014 |
21 | | |
22 | | #include "butil/compat.h" |
23 | | #include <new> // std::nothrow |
24 | | #include <sys/poll.h> // poll() |
25 | | #if defined(OS_MACOSX) |
26 | | #include <sys/types.h> // struct kevent |
27 | | #include <sys/event.h> // kevent(), kqueue() |
28 | | #endif |
29 | | #include "butil/atomicops.h" |
30 | | #include "butil/time.h" |
31 | | #include "butil/fd_utility.h" // make_non_blocking |
32 | | #include "butil/logging.h" |
33 | | #include "butil/third_party/murmurhash3/murmurhash3.h" // fmix32 |
34 | | #include "butil/memory/scope_guard.h" |
35 | | #include "bthread/butex.h" // butex_* |
36 | | #include "bthread/task_group.h" // TaskGroup |
37 | | #include "bthread/bthread.h" // bthread_start_urgent |
38 | | |
39 | | namespace butil { |
40 | | extern int pthread_fd_wait(int fd, unsigned events, const timespec* abstime); |
41 | | } |
42 | | |
43 | | // Implement bthread functions on file descriptors |
44 | | |
45 | | namespace bthread { |
46 | | |
47 | | extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; |
48 | | |
49 | | template <typename T, size_t NBLOCK, size_t BLOCK_SIZE> |
50 | | class LazyArray { |
51 | | struct Block { |
52 | | butil::atomic<T> items[BLOCK_SIZE]; |
53 | | }; |
54 | | |
55 | | public: |
56 | 2 | LazyArray() { |
57 | 2 | memset(static_cast<void*>(_blocks), 0, sizeof(butil::atomic<Block*>) * NBLOCK); |
58 | 2 | } |
59 | | |
60 | 0 | butil::atomic<T>* get_or_new(size_t index) { |
61 | 0 | const size_t block_index = index / BLOCK_SIZE; |
62 | 0 | if (block_index >= NBLOCK) { |
63 | 0 | return NULL; |
64 | 0 | } |
65 | 0 | const size_t block_offset = index - block_index * BLOCK_SIZE; |
66 | 0 | Block* b = _blocks[block_index].load(butil::memory_order_consume); |
67 | 0 | if (b != NULL) { |
68 | 0 | return b->items + block_offset; |
69 | 0 | } |
70 | 0 | b = new (std::nothrow) Block; |
71 | 0 | if (NULL == b) { |
72 | 0 | b = _blocks[block_index].load(butil::memory_order_consume); |
73 | 0 | return (b ? b->items + block_offset : NULL); |
74 | 0 | } |
75 | | // Set items to default value of T. |
76 | 0 | std::fill(b->items, b->items + BLOCK_SIZE, T()); |
77 | 0 | Block* expected = NULL; |
78 | 0 | if (_blocks[block_index].compare_exchange_strong( |
79 | 0 | expected, b, butil::memory_order_release, |
80 | 0 | butil::memory_order_consume)) { |
81 | 0 | return b->items + block_offset; |
82 | 0 | } |
83 | 0 | delete b; |
84 | 0 | return expected->items + block_offset; |
85 | 0 | } |
86 | | |
87 | 0 | butil::atomic<T>* get(size_t index) const { |
88 | 0 | const size_t block_index = index / BLOCK_SIZE; |
89 | 0 | if (__builtin_expect(block_index < NBLOCK, 1)) { |
90 | 0 | const size_t block_offset = index - block_index * BLOCK_SIZE; |
91 | 0 | Block* const b = _blocks[block_index].load(butil::memory_order_consume); |
92 | 0 | if (__builtin_expect(b != NULL, 1)) { |
93 | 0 | return b->items + block_offset; |
94 | 0 | } |
95 | 0 | } |
96 | 0 | return NULL; |
97 | 0 | } |
98 | | |
99 | | private: |
100 | | butil::atomic<Block*> _blocks[NBLOCK]; |
101 | | }; |
102 | | |
103 | | typedef butil::atomic<int> EpollButex; |
104 | | |
105 | | static EpollButex* const CLOSING_GUARD = (EpollButex*)(intptr_t)-1L; |
106 | | |
107 | | #ifndef NDEBUG |
108 | | butil::static_atomic<int> break_nums = BUTIL_STATIC_ATOMIC_INIT(0); |
109 | | #endif |
110 | | |
111 | | // Able to address 67108864 file descriptors, should be enough. |
112 | | LazyArray<EpollButex*, 262144/*NBLOCK*/, 256/*BLOCK_SIZE*/> fd_butexes; |
113 | | |
114 | | static const int BTHREAD_DEFAULT_EPOLL_SIZE = 65536; |
115 | | |
116 | | class EpollThread { |
117 | | public: |
118 | | EpollThread() |
119 | 2 | : _epfd(-1) |
120 | 2 | , _stop(false) |
121 | 2 | , _tid(0) { |
122 | 2 | } |
123 | | |
124 | 0 | int start(int epoll_size) { |
125 | 0 | if (started()) { |
126 | 0 | return -1; |
127 | 0 | } |
128 | 0 | _start_mutex.lock(); |
129 | | // Double check |
130 | 0 | if (started()) { |
131 | 0 | _start_mutex.unlock(); |
132 | 0 | return -1; |
133 | 0 | } |
134 | 0 | #if defined(OS_LINUX) |
135 | 0 | _epfd = epoll_create(epoll_size); |
136 | | #elif defined(OS_MACOSX) |
137 | | _epfd = kqueue(); |
138 | | #endif |
139 | 0 | _start_mutex.unlock(); |
140 | 0 | if (_epfd < 0) { |
141 | 0 | PLOG(FATAL) << "Fail to epoll_create/kqueue"; |
142 | 0 | return -1; |
143 | 0 | } |
144 | 0 | bthread_attr_t attr = BTHREAD_ATTR_NORMAL; |
145 | 0 | bthread_attr_set_name(&attr, "EpollThread::run_this"); |
146 | 0 | if (bthread_start_background( |
147 | 0 | &_tid, &attr, EpollThread::run_this, this) != 0) { |
148 | 0 | close(_epfd); |
149 | 0 | _epfd = -1; |
150 | 0 | LOG(FATAL) << "Fail to create epoll bthread"; |
151 | 0 | return -1; |
152 | 0 | } |
153 | 0 | return 0; |
154 | 0 | } |
155 | | |
156 | | // Note: This function does not wake up suspended fd_wait. This is fine |
157 | | // since stop_and_join is only called on program's termination |
158 | | // (g_task_control.stop()), suspended bthreads do not block quit of |
159 | | // worker pthreads and completion of g_task_control.stop(). |
160 | 0 | int stop_and_join() { |
161 | 0 | if (!started()) { |
162 | 0 | return 0; |
163 | 0 | } |
164 | | // No matter what this function returns, _epfd will be set to -1 |
165 | | // (making started() false) to avoid latter stop_and_join() to |
166 | | // enter again. |
167 | 0 | const int saved_epfd = _epfd; |
168 | 0 | _epfd = -1; |
169 | | |
170 | | // epoll_wait cannot be woken up by closing _epfd. We wake up |
171 | | // epoll_wait by inserting a fd continuously triggering EPOLLOUT. |
172 | | // Visibility of _stop: constant EPOLLOUT forces epoll_wait to see |
173 | | // _stop (to be true) finally. |
174 | 0 | _stop = true; |
175 | 0 | int closing_epoll_pipe[2]; |
176 | 0 | if (pipe(closing_epoll_pipe)) { |
177 | 0 | PLOG(FATAL) << "Fail to create closing_epoll_pipe"; |
178 | 0 | return -1; |
179 | 0 | } |
180 | 0 | #if defined(OS_LINUX) |
181 | 0 | epoll_event evt = { EPOLLOUT, { NULL } }; |
182 | 0 | if (epoll_ctl(saved_epfd, EPOLL_CTL_ADD, |
183 | 0 | closing_epoll_pipe[1], &evt) < 0) { |
184 | | #elif defined(OS_MACOSX) |
185 | | struct kevent kqueue_event; |
186 | | EV_SET(&kqueue_event, closing_epoll_pipe[1], EVFILT_WRITE, EV_ADD | EV_ENABLE, |
187 | | 0, 0, NULL); |
188 | | if (kevent(saved_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) { |
189 | | #endif |
190 | 0 | PLOG(FATAL) << "Fail to add closing_epoll_pipe into epfd=" |
191 | 0 | << saved_epfd; |
192 | 0 | return -1; |
193 | 0 | } |
194 | | |
195 | 0 | const int rc = bthread_join(_tid, NULL); |
196 | 0 | if (rc) { |
197 | 0 | LOG(FATAL) << "Fail to join EpollThread, " << berror(rc); |
198 | 0 | return -1; |
199 | 0 | } |
200 | 0 | close(closing_epoll_pipe[0]); |
201 | 0 | close(closing_epoll_pipe[1]); |
202 | 0 | close(saved_epfd); |
203 | 0 | return 0; |
204 | 0 | } |
205 | | |
206 | 0 | int fd_wait(int fd, unsigned events, const timespec* abstime) { |
207 | 0 | butil::atomic<EpollButex*>* p = fd_butexes.get_or_new(fd); |
208 | 0 | if (NULL == p) { |
209 | 0 | errno = ENOMEM; |
210 | 0 | return -1; |
211 | 0 | } |
212 | | |
213 | 0 | EpollButex* butex = p->load(butil::memory_order_consume); |
214 | 0 | if (NULL == butex) { |
215 | | // It is rare to wait on one file descriptor from multiple threads |
216 | | // simultaneously. Creating singleton by optimistic locking here |
217 | | // saves mutexes for each butex. |
218 | 0 | butex = butex_create_checked<EpollButex>(); |
219 | 0 | butex->store(0, butil::memory_order_relaxed); |
220 | 0 | EpollButex* expected = NULL; |
221 | 0 | if (!p->compare_exchange_strong(expected, butex, |
222 | 0 | butil::memory_order_release, |
223 | 0 | butil::memory_order_consume)) { |
224 | 0 | butex_destroy(butex); |
225 | 0 | butex = expected; |
226 | 0 | } |
227 | 0 | } |
228 | | |
229 | 0 | while (butex == CLOSING_GUARD) { // bthread_close() is running. |
230 | 0 | if (sched_yield() < 0) { |
231 | 0 | return -1; |
232 | 0 | } |
233 | 0 | butex = p->load(butil::memory_order_consume); |
234 | 0 | } |
235 | | // Save value of butex before adding to epoll because the butex may |
236 | | // be changed before butex_wait. No memory fence because EPOLL_CTL_MOD |
237 | | // and EPOLL_CTL_ADD shall have release fence. |
238 | 0 | const int expected_val = butex->load(butil::memory_order_relaxed); |
239 | |
|
240 | 0 | #if defined(OS_LINUX) |
241 | | # ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG |
242 | | epoll_event evt = { events | EPOLLONESHOT, { butex } }; |
243 | | if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) { |
244 | | if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 && |
245 | | errno != EEXIST) { |
246 | | PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd; |
247 | | return -1; |
248 | | } |
249 | | } |
250 | | # else |
251 | 0 | epoll_event evt; |
252 | 0 | evt.events = events; |
253 | 0 | evt.data.fd = fd; |
254 | 0 | if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 && |
255 | 0 | errno != EEXIST) { |
256 | 0 | PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd; |
257 | 0 | return -1; |
258 | 0 | } |
259 | 0 | # endif |
260 | | #elif defined(OS_MACOSX) |
261 | | struct kevent kqueue_event; |
262 | | EV_SET(&kqueue_event, fd, events, EV_ADD | EV_ENABLE | EV_ONESHOT, |
263 | | 0, 0, butex); |
264 | | if (kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) { |
265 | | PLOG(FATAL) << "Fail to add fd=" << fd << " into kqueuefd=" << _epfd; |
266 | | return -1; |
267 | | } |
268 | | #endif |
269 | 0 | while (butex->load(butil::memory_order_relaxed) == expected_val) { |
270 | 0 | if (butex_wait(butex, expected_val, abstime) < 0 && |
271 | 0 | errno != EWOULDBLOCK && errno != EINTR) { |
272 | 0 | return -1; |
273 | 0 | } |
274 | 0 | } |
275 | 0 | return 0; |
276 | 0 | } |
277 | | |
278 | 0 | int fd_close(int fd) { |
279 | 0 | if (fd < 0) { |
280 | | // what close(-1) returns |
281 | 0 | errno = EBADF; |
282 | 0 | return -1; |
283 | 0 | } |
284 | 0 | butil::atomic<EpollButex*>* pbutex = bthread::fd_butexes.get(fd); |
285 | 0 | if (NULL == pbutex) { |
286 | | // Did not call bthread_fd functions, close directly. |
287 | 0 | return close(fd); |
288 | 0 | } |
289 | 0 | EpollButex* butex = pbutex->exchange( |
290 | 0 | CLOSING_GUARD, butil::memory_order_relaxed); |
291 | 0 | if (butex == CLOSING_GUARD) { |
292 | | // concurrent double close detected. |
293 | 0 | errno = EBADF; |
294 | 0 | return -1; |
295 | 0 | } |
296 | 0 | if (butex != NULL) { |
297 | 0 | butex->fetch_add(1, butil::memory_order_relaxed); |
298 | 0 | butex_wake_all(butex); |
299 | 0 | } |
300 | 0 | #if defined(OS_LINUX) |
301 | 0 | epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL); |
302 | | #elif defined(OS_MACOSX) |
303 | | struct kevent evt; |
304 | | EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); |
305 | | kevent(_epfd, &evt, 1, NULL, 0, NULL); |
306 | | EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); |
307 | | kevent(_epfd, &evt, 1, NULL, 0, NULL); |
308 | | #endif |
309 | 0 | const int rc = close(fd); |
310 | 0 | pbutex->exchange(butex, butil::memory_order_relaxed); |
311 | 0 | return rc; |
312 | 0 | } |
313 | | |
314 | 0 | bool started() const { |
315 | 0 | return _epfd >= 0; |
316 | 0 | } |
317 | | |
318 | | private: |
319 | 0 | static void* run_this(void* arg) { |
320 | 0 | return static_cast<EpollThread*>(arg)->run(); |
321 | 0 | } |
322 | | |
323 | 0 | void* run() { |
324 | 0 | const int initial_epfd = _epfd; |
325 | 0 | const size_t MAX_EVENTS = 32; |
326 | 0 | #if defined(OS_LINUX) |
327 | 0 | epoll_event* e = new (std::nothrow) epoll_event[MAX_EVENTS]; |
328 | | #elif defined(OS_MACOSX) |
329 | | typedef struct kevent KEVENT; |
330 | | struct kevent* e = new (std::nothrow) KEVENT[MAX_EVENTS]; |
331 | | #endif |
332 | 0 | if (NULL == e) { |
333 | 0 | LOG(FATAL) << "Fail to new epoll_event"; |
334 | 0 | return NULL; |
335 | 0 | } |
336 | | |
337 | 0 | #if defined(OS_LINUX) |
338 | 0 | # ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG |
339 | 0 | DLOG(INFO) << "Use DEL+ADD instead of EPOLLONESHOT+MOD due to kernel bug. Performance will be much lower."; |
340 | 0 | # endif |
341 | 0 | #endif |
342 | 0 | while (!_stop) { |
343 | 0 | const int epfd = _epfd; |
344 | 0 | #if defined(OS_LINUX) |
345 | 0 | const int n = epoll_wait(epfd, e, MAX_EVENTS, -1); |
346 | | #elif defined(OS_MACOSX) |
347 | | const int n = kevent(epfd, NULL, 0, e, MAX_EVENTS, NULL); |
348 | | #endif |
349 | 0 | if (_stop) { |
350 | 0 | break; |
351 | 0 | } |
352 | | |
353 | 0 | if (n < 0) { |
354 | 0 | if (errno == EINTR) { |
355 | 0 | #ifndef NDEBUG |
356 | 0 | break_nums.fetch_add(1, butil::memory_order_relaxed); |
357 | 0 | int* p = &errno; |
358 | 0 | const char* b = berror(); |
359 | 0 | const char* b2 = berror(errno); |
360 | 0 | DLOG(FATAL) << "Fail to epoll epfd=" << epfd << ", " |
361 | 0 | << errno << " " << p << " " << b << " " << b2; |
362 | 0 | #endif |
363 | 0 | continue; |
364 | 0 | } |
365 | | |
366 | 0 | PLOG(INFO) << "Fail to epoll epfd=" << epfd; |
367 | 0 | break; |
368 | 0 | } |
369 | | |
370 | 0 | #if defined(OS_LINUX) |
371 | 0 | # ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG |
372 | 0 | for (int i = 0; i < n; ++i) { |
373 | 0 | epoll_ctl(epfd, EPOLL_CTL_DEL, e[i].data.fd, NULL); |
374 | 0 | } |
375 | 0 | # endif |
376 | 0 | #endif |
377 | 0 | for (int i = 0; i < n; ++i) { |
378 | 0 | #if defined(OS_LINUX) |
379 | | # ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG |
380 | | EpollButex* butex = static_cast<EpollButex*>(e[i].data.ptr); |
381 | | # else |
382 | 0 | butil::atomic<EpollButex*>* pbutex = fd_butexes.get(e[i].data.fd); |
383 | 0 | EpollButex* butex = pbutex ? |
384 | 0 | pbutex->load(butil::memory_order_consume) : NULL; |
385 | 0 | # endif |
386 | | #elif defined(OS_MACOSX) |
387 | | EpollButex* butex = static_cast<EpollButex*>(e[i].udata); |
388 | | #endif |
389 | 0 | if (butex != NULL && butex != CLOSING_GUARD) { |
390 | 0 | butex->fetch_add(1, butil::memory_order_relaxed); |
391 | 0 | butex_wake_all(butex); |
392 | 0 | } |
393 | 0 | } |
394 | 0 | } |
395 | |
|
396 | 0 | delete [] e; |
397 | 0 | DLOG(INFO) << "EpollThread=" << _tid << "(epfd=" |
398 | 0 | << initial_epfd << ") is about to stop"; |
399 | 0 | return NULL; |
400 | 0 | } |
401 | | |
402 | | int _epfd; |
403 | | bool _stop; |
404 | | bthread_t _tid; |
405 | | butil::Mutex _start_mutex; |
406 | | }; |
407 | | |
408 | | EpollThread epoll_thread[BTHREAD_EPOLL_THREAD_NUM]; |
409 | | |
410 | 0 | static inline EpollThread& get_epoll_thread(int fd) { |
411 | 0 | if (BTHREAD_EPOLL_THREAD_NUM == 1UL) { |
412 | 0 | EpollThread& et = epoll_thread[0]; |
413 | 0 | et.start(BTHREAD_DEFAULT_EPOLL_SIZE); |
414 | 0 | return et; |
415 | 0 | } |
416 | | |
417 | 0 | EpollThread& et = epoll_thread[butil::fmix32(fd) % BTHREAD_EPOLL_THREAD_NUM]; |
418 | 0 | et.start(BTHREAD_DEFAULT_EPOLL_SIZE); |
419 | 0 | return et; |
420 | 0 | } |
421 | | |
422 | | //TODO(zhujiashun): change name |
423 | 0 | int stop_and_join_epoll_threads() { |
424 | | // Returns -1 if any epoll thread failed to stop. |
425 | 0 | int rc = 0; |
426 | 0 | for (size_t i = 0; i < BTHREAD_EPOLL_THREAD_NUM; ++i) { |
427 | 0 | if (epoll_thread[i].stop_and_join() < 0) { |
428 | 0 | rc = -1; |
429 | 0 | } |
430 | 0 | } |
431 | 0 | return rc; |
432 | 0 | } |
433 | | |
434 | | // For pthreads. |
435 | | int pthread_fd_wait(int fd, unsigned events, |
436 | 0 | const timespec* abstime) { |
437 | 0 | return butil::pthread_fd_wait(fd, events, abstime); |
438 | 0 | } |
439 | | |
440 | | } // namespace bthread |
441 | | |
442 | | extern "C" { |
443 | | |
444 | 0 | int bthread_fd_wait(int fd, unsigned events) { |
445 | 0 | if (fd < 0) { |
446 | 0 | errno = EINVAL; |
447 | 0 | return -1; |
448 | 0 | } |
449 | 0 | bthread::TaskGroup* g = bthread::tls_task_group; |
450 | 0 | if (NULL != g && !g->is_current_pthread_task()) { |
451 | 0 | return bthread::get_epoll_thread(fd).fd_wait( |
452 | 0 | fd, events, NULL); |
453 | 0 | } |
454 | 0 | return bthread::pthread_fd_wait(fd, events, NULL); |
455 | 0 | } |
456 | | |
457 | | int bthread_fd_timedwait(int fd, unsigned events, |
458 | 0 | const timespec* abstime) { |
459 | 0 | if (NULL == abstime) { |
460 | 0 | return bthread_fd_wait(fd, events); |
461 | 0 | } |
462 | 0 | if (fd < 0) { |
463 | 0 | errno = EINVAL; |
464 | 0 | return -1; |
465 | 0 | } |
466 | 0 | bthread::TaskGroup* g = bthread::tls_task_group; |
467 | 0 | if (NULL != g && !g->is_current_pthread_task()) { |
468 | 0 | return bthread::get_epoll_thread(fd).fd_wait( |
469 | 0 | fd, events, abstime); |
470 | 0 | } |
471 | 0 | return bthread::pthread_fd_wait(fd, events, abstime); |
472 | 0 | } |
473 | | |
474 | | int bthread_connect(int sockfd, const sockaddr* serv_addr, |
475 | | socklen_t addrlen) { |
476 | | bthread::TaskGroup* g = bthread::tls_task_group; |
477 | | if (NULL == g || g->is_current_pthread_task()) { |
478 | | return ::connect(sockfd, serv_addr, addrlen); |
479 | | } |
480 | | |
481 | | bool is_blocking = butil::is_blocking(sockfd); |
482 | | if (is_blocking) { |
483 | | butil::make_non_blocking(sockfd); |
484 | | } |
485 | | // Scoped non-blocking. |
486 | 0 | auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() { |
487 | 0 | if (is_blocking) { |
488 | 0 | butil::make_blocking(sockfd); |
489 | 0 | } |
490 | 0 | }); |
491 | | |
492 | | const int rc = ::connect(sockfd, serv_addr, addrlen); |
493 | | if (rc == 0 || errno != EINPROGRESS) { |
494 | | return rc; |
495 | | } |
496 | | #if defined(OS_LINUX) |
497 | | if (bthread_fd_wait(sockfd, EPOLLOUT) < 0) { |
498 | | #elif defined(OS_MACOSX) |
499 | | if (bthread_fd_wait(sockfd, EVFILT_WRITE) < 0) { |
500 | | #endif |
501 | | return -1; |
502 | | } |
503 | | |
504 | | if (butil::is_connected(sockfd) != 0) { |
505 | | return -1; |
506 | | } |
507 | | |
508 | | return 0; |
509 | | } |
510 | | |
511 | | int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr, |
512 | 0 | socklen_t addrlen, const timespec* abstime) { |
513 | 0 | if (!abstime) { |
514 | 0 | return bthread_connect(sockfd, serv_addr, addrlen); |
515 | 0 | } |
516 | | |
517 | 0 | bool is_blocking = butil::is_blocking(sockfd); |
518 | 0 | if (is_blocking) { |
519 | 0 | butil::make_non_blocking(sockfd); |
520 | 0 | } |
521 | | // Scoped non-blocking. |
522 | 0 | auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() { |
523 | 0 | if (is_blocking) { |
524 | 0 | butil::make_blocking(sockfd); |
525 | 0 | } |
526 | 0 | }); |
527 | |
|
528 | 0 | const int rc = ::connect(sockfd, serv_addr, addrlen); |
529 | 0 | if (rc == 0 || errno != EINPROGRESS) { |
530 | 0 | return rc; |
531 | 0 | } |
532 | 0 | #if defined(OS_LINUX) |
533 | 0 | if (bthread_fd_timedwait(sockfd, EPOLLOUT, abstime) < 0) { |
534 | | #elif defined(OS_MACOSX) |
535 | | if (bthread_fd_timedwait(sockfd, EVFILT_WRITE, abstime) < 0) { |
536 | | #endif |
537 | 0 | return -1; |
538 | 0 | } |
539 | | |
540 | 0 | if (butil::is_connected(sockfd) != 0) { |
541 | 0 | return -1; |
542 | 0 | } |
543 | | |
544 | 0 | return 0; |
545 | 0 | } |
546 | | |
547 | | // This does not wake pthreads calling bthread_fd_*wait. |
548 | 0 | int bthread_close(int fd) { |
549 | 0 | return bthread::get_epoll_thread(fd).fd_close(fd); |
550 | 0 | } |
551 | | |
552 | | } // extern "C" |