Coverage Report

Created: 2025-11-24 06:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/butil/details/extended_endpoint.hpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#ifndef BUTIL_DETAILS_EXTENDED_ENDPOINT_H
19
#define BUTIL_DETAILS_EXTENDED_ENDPOINT_H
20
21
#include <arpa/inet.h>
22
#include <netdb.h>
23
#include <unistd.h>
24
#include <string.h>
25
#include <stdio.h>
26
#include <stdlib.h>
27
#include <sys/un.h>
28
#include <mutex>
29
#include <unordered_set>
30
#include "butil/endpoint.h"
31
#include "butil/logging.h"
32
#include "butil/strings/string_piece.h"
33
#include "butil/resource_pool.h"
34
#include "butil/memory/singleton_on_pthread_once.h"
35
36
namespace butil {
37
namespace details {
38
39
#if __cplusplus >= 201103L
40
static_assert(sizeof(EndPoint) == sizeof(EndPoint::ip) + sizeof(EndPoint::port),
41
        "EndPoint size mismatch with the one in POD-style, may cause ABI problem");
42
#endif
43
44
// For ipv6/unix socket address.
45
//
46
// We have to keep butil::EndPoint ABI compatible because it is used so widely, and the size of butil::EndPoint is
47
// too small to store more information such as ipv6 address.
48
// We store enough information about endpoint in such tiny struct by putting real things in another big object
49
// holding by ResourcePool. The EndPoint::ip saves ResourceId, while EndPoint::port denotes if the EndPoint object
50
// is an old style ipv4 endpoint.
51
// Note that since ResourcePool has been implemented in bthread, we copy it into this repo and change its namespace to
52
// butil::details. Those two headers will not be published.
53
54
// If EndPoint.port equals to this value, we should get the extended endpoint in resource pool.
55
const static int EXTENDED_ENDPOINT_PORT = 123456789;
56
57
class ExtendedEndPoint;
58
59
// A global unordered set to dedup ExtendedEndPoint
60
// ExtendedEndPoints which have same ipv6/unix socket address must have same id,
61
// so that user can simply use the value of EndPoint for comparision.
62
class GlobalEndPointSet {
63
public:
64
    ExtendedEndPoint* insert(ExtendedEndPoint* p);
65
66
    void erase(ExtendedEndPoint* p);
67
68
0
    static GlobalEndPointSet* instance() {
69
0
        return ::butil::get_leaky_singleton<GlobalEndPointSet>();
70
0
    }
71
72
private:
73
    struct Hash {
74
        size_t operator()(ExtendedEndPoint* const& p) const;
75
    };
76
77
    struct Equals {
78
        bool operator()(ExtendedEndPoint* const& p1, ExtendedEndPoint* const& p2) const;
79
    };
80
81
    typedef std::unordered_set<ExtendedEndPoint*, Hash, Equals> SetType;
82
    SetType _set;
83
    std::mutex _mutex;
84
};
85
86
class ExtendedEndPoint {
87
public:
88
    // Construct ExtendedEndPoint.
89
    // User should use create() functions to get ExtendedEndPoint instance.
90
0
    ExtendedEndPoint(void) {
91
0
        _ref_count.store(0, butil::memory_order_relaxed);
92
0
        _u.sa.sa_family = AF_UNSPEC;
93
0
    }
94
95
public:
96
    // Create ExtendedEndPoint.
97
    // If creation is successful, create()s will embed the ExtendedEndPoint instance in the given EndPoint*,
98
    // and return it as well. Or else, the given EndPoint* won't be touched.
99
    //
100
    // The format of the parameter is inspired by nginx.
101
    // Valid forms are:
102
    // - ipv6
103
    // without port: [2400:da00::3b0b]
104
    // with    port: [2400:da00::3b0b]:8080
105
    // - unix domain socket
106
    // abslute path : unix:/path/to/file.sock
107
    // relative path: unix:path/to/file.sock
108
109
0
    static ExtendedEndPoint* create(StringPiece sp, EndPoint* ep) {
110
0
        sp.trim_spaces();
111
0
        if (sp.empty()) {
112
0
            return NULL;
113
0
        }
114
0
        if (sp[0] == '[') {
115
0
            size_t colon_pos = sp.find(']');
116
0
            if (colon_pos == StringPiece::npos || colon_pos == 1 /* [] is invalid */ || ++colon_pos >= sp.size()) {
117
0
                return NULL;
118
0
            }
119
0
            StringPiece port_sp = sp.substr(colon_pos);
120
0
            if (port_sp.size() < 2 /* colon and at least one integer */ || port_sp[0] != ':') {
121
0
                return NULL;
122
0
            }
123
0
            port_sp.remove_prefix(1); // remove `:'
124
0
            if (port_sp.size() > 5) { // max 65535
125
0
                return NULL;
126
0
            }
127
0
            char buf[6];
128
0
            buf[port_sp.copy(buf, port_sp.size())] = '\0';
129
0
            char* end = NULL;
130
0
            int port = ::strtol(buf, &end, 10 /* base */);
131
0
            if (end != buf + port_sp.size()) {
132
0
                return NULL;
133
0
            }
134
0
            return create(sp.substr(0, colon_pos), port, ep);
135
0
        } else if (sp.starts_with("unix:")) {
136
0
            return create(sp, EXTENDED_ENDPOINT_PORT, ep);
137
0
        }
138
0
        return NULL;
139
0
    }
140
141
0
    static ExtendedEndPoint* create(StringPiece sp, int port, EndPoint* ep) {
142
0
        sp.trim_spaces();
143
0
        if (sp.empty()) {
144
0
            return NULL;
145
0
        }
146
0
        ExtendedEndPoint* eep = NULL;
147
0
        if (sp[0] == '[' && port >= 0 && port <= 65535) {
148
0
            if (sp.back() != ']' || sp.size() == 2 || sp.size() - 2 >= INET6_ADDRSTRLEN) {
149
0
                return NULL;
150
0
            }
151
0
            char buf[INET6_ADDRSTRLEN];
152
0
            buf[sp.copy(buf, sp.size() - 2 /* skip `[' and `]' */, 1 /* skip `[' */)] = '\0';
153
154
0
            in6_addr addr;
155
0
            if (inet_pton(AF_INET6, buf, &addr) != 1 /* succ */) {
156
0
                return NULL;
157
0
            }
158
159
0
            eep = new_extended_endpoint(AF_INET6);
160
0
            if (eep) {
161
0
                eep->_u.in6.sin6_addr = addr;
162
0
                eep->_u.in6.sin6_port = htons(port);
163
0
                eep->_u.in6.sin6_flowinfo = 0u;
164
0
                eep->_u.in6.sin6_scope_id = 0u;
165
0
                eep->_socklen = sizeof(_u.in6);
166
#if defined(OS_MACOSX)
167
                eep->_u.in6.sin6_len = eep->_socklen;
168
#endif
169
0
            }
170
0
        } else if (sp.starts_with("unix:")) { // ignore port
171
0
            sp.remove_prefix(5); // remove `unix:'
172
0
            if (sp.empty() || sp.size() >= UDS_PATH_SIZE) {
173
0
                return NULL;
174
0
            }
175
0
            eep = new_extended_endpoint(AF_UNIX);
176
0
            if (eep) {
177
0
                int size = sp.copy(eep->_u.un.sun_path, sp.size());
178
0
                eep->_u.un.sun_path[size] = '\0';
179
0
                eep->_socklen = offsetof(sockaddr_un, sun_path) + size + 1;
180
#if defined(OS_MACOSX)
181
                eep->_u.un.sun_len = eep->_socklen;
182
#endif
183
0
            }
184
0
        }
185
0
        if (eep) {
186
0
            eep = dedup(eep);
187
0
            eep->embed_to(ep);
188
0
        }
189
0
        return eep;
190
0
    }
191
192
0
    static ExtendedEndPoint* create(sockaddr_storage* ss, socklen_t size, EndPoint* ep) {
193
0
        ExtendedEndPoint* eep = NULL;
194
0
        if (ss->ss_family == AF_INET6 || ss->ss_family == AF_UNIX) {
195
0
            eep = new_extended_endpoint(ss->ss_family);
196
0
        }
197
0
        if (eep) {
198
0
            memcpy(&eep->_u.ss, ss, size);
199
0
            eep->_socklen = size;
200
0
            if (ss->ss_family == AF_UNIX && size == offsetof(sockaddr_un, sun_path)) {
201
                // See unix(7): When the address of an unnamed socket is returned, 
202
                // its length is sizeof(sa_family_t), and sun_path should not be inspected.
203
0
                eep->_u.un.sun_path[0] = '\0';
204
0
            }
205
0
            eep = dedup(eep);
206
0
            eep->embed_to(ep);
207
0
        }
208
0
        return eep;
209
0
    }
210
211
    // Get ExtendedEndPoint instance from EndPoint
212
0
    static ExtendedEndPoint* address(const EndPoint& ep) {
213
0
        if (!is_extended(ep)) {
214
0
            return NULL;
215
0
        }
216
0
        ::butil::ResourceId<ExtendedEndPoint> id;
217
0
        id.value = ep.ip.s_addr;
218
0
        ExtendedEndPoint* eep = ::butil::address_resource<ExtendedEndPoint>(id);
219
0
        CHECK(eep) << "fail to address ExtendedEndPoint from EndPoint";
220
0
        return eep;
221
0
    }
222
223
    // Check if an EndPoint has embedded ExtendedEndPoint
224
0
    static bool is_extended(const butil::EndPoint& ep) {
225
0
        return ep.port == EXTENDED_ENDPOINT_PORT;
226
0
    }
227
228
private:
229
    friend class GlobalEndPointSet;
230
231
0
    static GlobalEndPointSet* global_set() {
232
0
        return GlobalEndPointSet::instance();
233
0
    }
234
235
0
    static ExtendedEndPoint* new_extended_endpoint(sa_family_t family) {
236
0
        ::butil::ResourceId<ExtendedEndPoint> id;
237
0
        ExtendedEndPoint* eep = ::butil::get_resource(&id);
238
0
        if (eep) {
239
0
            int64_t old_ref = eep->_ref_count.load(butil::memory_order_relaxed);
240
0
            CHECK(old_ref == 0) << "new ExtendedEndPoint has reference " << old_ref;
241
0
            CHECK(eep->_u.sa.sa_family == AF_UNSPEC) << "new ExtendedEndPoint has family " << eep->_u.sa.sa_family << " set";
242
0
            eep->_ref_count.store(1, butil::memory_order_relaxed);
243
0
            eep->_id = id;
244
0
            eep->_u.sa.sa_family = family;
245
0
        }
246
0
        return eep;
247
0
    }
248
249
0
    void embed_to(EndPoint* ep) const {
250
0
        CHECK(0 == _id.value >> 32) << "ResourceId beyond index";
251
0
        ep->reset();
252
0
        ep->ip = ip_t{static_cast<uint32_t>(_id.value)};
253
0
        ep->port = EXTENDED_ENDPOINT_PORT;
254
0
    }
255
256
0
    static ExtendedEndPoint* dedup(ExtendedEndPoint* eep) {
257
0
        eep->_hash = std::hash<std::string>()(std::string((const char*)&eep->_u, eep->_socklen));
258
259
0
        ExtendedEndPoint* first_eep = global_set()->insert(eep);
260
0
        if (first_eep != eep) {
261
0
            eep->_ref_count.store(0, butil::memory_order_relaxed);
262
0
            eep->_u.sa.sa_family = AF_UNSPEC;
263
0
            ::butil::return_resource(eep->_id);
264
0
        }
265
0
        return first_eep;
266
0
    }
267
268
public:
269
270
0
    void dec_ref(void) {
271
0
        int64_t old_ref = _ref_count.fetch_sub(1, butil::memory_order_relaxed);
272
0
        CHECK(old_ref >= 1) << "ExtendedEndPoint has unexpected reference " << old_ref;
273
0
        if (old_ref == 1) {
274
0
            global_set()->erase(this);
275
0
            _u.sa.sa_family = AF_UNSPEC;
276
0
            ::butil::return_resource(_id);
277
0
        }
278
0
    }
279
280
0
    void inc_ref(void) {
281
0
        int64_t old_ref = _ref_count.fetch_add(1, butil::memory_order_relaxed);
282
0
        CHECK(old_ref >= 1) << "ExtendedEndPoint has unexpected reference " << old_ref;
283
0
    }
284
285
0
    sa_family_t family(void) const {
286
0
        return _u.sa.sa_family;
287
0
    }
288
289
0
    int to(sockaddr_storage* ss) const {
290
0
        memcpy(ss, &_u.ss, _socklen);
291
0
        return _socklen;
292
0
    }
293
294
0
    void to(EndPointStr* ep_str) const {
295
0
        if (_u.sa.sa_family == AF_UNIX) {
296
0
            snprintf(ep_str->_buf, sizeof(ep_str->_buf), "unix:%s", _u.un.sun_path);
297
0
        } else if (_u.sa.sa_family == AF_INET6) {
298
0
            char buf[INET6_ADDRSTRLEN] = {0};
299
0
            const char* ret = inet_ntop(_u.sa.sa_family, &_u.in6.sin6_addr, buf, sizeof(buf));
300
0
            CHECK(ret) << "fail to do inet_ntop";
301
0
            snprintf(ep_str->_buf, sizeof(ep_str->_buf), "[%s]:%d", buf, ntohs(_u.in6.sin6_port));
302
0
        } else {
303
0
            CHECK(0) << "family " << _u.sa.sa_family << " not supported";
304
0
        }
305
0
    }
306
307
0
    int to_hostname(char* host, size_t host_len) const {
308
0
        if (_u.sa.sa_family == AF_UNIX) {
309
0
            snprintf(host, host_len, "unix:%s", _u.un.sun_path);
310
0
            return 0;
311
0
        } else if (_u.sa.sa_family == AF_INET6) {
312
0
            sockaddr_in6 sa = _u.in6;
313
0
            if (getnameinfo((const sockaddr*) &sa, sizeof(sa), host, host_len, NULL, 0, NI_NAMEREQD) != 0) {
314
0
                return -1;
315
0
            }
316
0
            size_t len = ::strlen(host);
317
0
            if (len + 1 < host_len) {
318
0
                snprintf(host + len, host_len - len, ":%d", _u.in6.sin6_port);
319
0
            }
320
0
            return 0;
321
0
        } else {
322
0
            CHECK(0) << "family " << _u.sa.sa_family << " not supported";
323
0
            return -1;
324
0
        }
325
0
    }
326
327
private:
328
    static const size_t UDS_PATH_SIZE = sizeof(sockaddr_un::sun_path);
329
330
    butil::atomic<int64_t> _ref_count;
331
    butil::ResourceId<ExtendedEndPoint> _id;
332
    size_t _hash;  // pre-compute hash code of sockaddr for saving unordered_set query time
333
    socklen_t _socklen; // valid data length of sockaddr
334
    union {
335
        sockaddr sa;
336
        sockaddr_in6 in6;
337
        sockaddr_un un;
338
        sockaddr_storage ss;
339
    } _u;
340
};
341
342
0
inline ExtendedEndPoint* GlobalEndPointSet::insert(ExtendedEndPoint* p) {
343
0
    std::unique_lock<std::mutex> lock(_mutex);
344
0
    auto it = _set.find(p);
345
0
    if (it != _set.end()) {
346
0
        if ((*it)->_ref_count.fetch_add(1, butil::memory_order_relaxed) == 0) {
347
            // another thread is calling dec_ref(), do not reuse it
348
0
            (*it)->_ref_count.fetch_sub(1, butil::memory_order_relaxed);
349
0
            _set.erase(it);
350
0
            _set.insert(p);
351
0
            return p;
352
0
        } else {
353
            // the ExtendedEndPoint is valid, reuse it 
354
0
            return *it;
355
0
        }
356
0
    }
357
0
    _set.insert(p);
358
0
    return p;
359
0
}
360
361
0
inline void GlobalEndPointSet::erase(ExtendedEndPoint* p) {
362
0
    std::unique_lock<std::mutex> lock(_mutex);
363
0
    auto it = _set.find(p);
364
0
    if (it == _set.end() || *it != p) {
365
        // another thread has been erase it
366
0
        return;
367
0
    }
368
0
    _set.erase(it);
369
0
}
370
371
0
inline size_t GlobalEndPointSet::Hash::operator()(ExtendedEndPoint* const& p) const {
372
0
    return p->_hash;
373
0
}
374
375
0
inline bool GlobalEndPointSet::Equals::operator()(ExtendedEndPoint* const& p1, ExtendedEndPoint* const& p2) const {
376
0
    return p1->_socklen == p2->_socklen 
377
0
            && memcmp(&p1->_u, &p2->_u, p1->_socklen) == 0;
378
0
}
379
380
} // namespace details
381
} // namespace butil
382
383
#endif // BUTIL_DETAILS_EXTENDED_ENDPOINT_H