/src/brpc/src/butil/details/extended_endpoint.hpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #ifndef BUTIL_DETAILS_EXTENDED_ENDPOINT_H |
19 | | #define BUTIL_DETAILS_EXTENDED_ENDPOINT_H |
20 | | |
21 | | #include <arpa/inet.h> |
22 | | #include <netdb.h> |
23 | | #include <unistd.h> |
24 | | #include <string.h> |
25 | | #include <stdio.h> |
26 | | #include <stdlib.h> |
27 | | #include <sys/un.h> |
28 | | #include <mutex> |
29 | | #include <unordered_set> |
30 | | #include "butil/endpoint.h" |
31 | | #include "butil/logging.h" |
32 | | #include "butil/strings/string_piece.h" |
33 | | #include "butil/resource_pool.h" |
34 | | #include "butil/memory/singleton_on_pthread_once.h" |
35 | | |
36 | | namespace butil { |
37 | | namespace details { |
38 | | |
39 | | #if __cplusplus >= 201103L |
40 | | static_assert(sizeof(EndPoint) == sizeof(EndPoint::ip) + sizeof(EndPoint::port), |
41 | | "EndPoint size mismatch with the one in POD-style, may cause ABI problem"); |
42 | | #endif |
43 | | |
44 | | // For ipv6/unix socket address. |
45 | | // |
46 | | // We have to keep butil::EndPoint ABI compatible because it is used so widely, and the size of butil::EndPoint is |
47 | | // too small to store more information such as ipv6 address. |
48 | | // We store enough information about endpoint in such tiny struct by putting real things in another big object |
49 | | // holding by ResourcePool. The EndPoint::ip saves ResourceId, while EndPoint::port denotes if the EndPoint object |
50 | | // is an old style ipv4 endpoint. |
51 | | // Note that since ResourcePool has been implemented in bthread, we copy it into this repo and change its namespace to |
52 | | // butil::details. Those two headers will not be published. |
53 | | |
54 | | // If EndPoint.port equals to this value, we should get the extended endpoint in resource pool. |
55 | | const static int EXTENDED_ENDPOINT_PORT = 123456789; |
56 | | |
57 | | class ExtendedEndPoint; |
58 | | |
59 | | // A global unordered set to dedup ExtendedEndPoint |
60 | | // ExtendedEndPoints which have same ipv6/unix socket address must have same id, |
61 | | // so that user can simply use the value of EndPoint for comparision. |
62 | | class GlobalEndPointSet { |
63 | | public: |
64 | | ExtendedEndPoint* insert(ExtendedEndPoint* p); |
65 | | |
66 | | void erase(ExtendedEndPoint* p); |
67 | | |
68 | 0 | static GlobalEndPointSet* instance() { |
69 | 0 | return ::butil::get_leaky_singleton<GlobalEndPointSet>(); |
70 | 0 | } |
71 | | |
72 | | private: |
73 | | struct Hash { |
74 | | size_t operator()(ExtendedEndPoint* const& p) const; |
75 | | }; |
76 | | |
77 | | struct Equals { |
78 | | bool operator()(ExtendedEndPoint* const& p1, ExtendedEndPoint* const& p2) const; |
79 | | }; |
80 | | |
81 | | typedef std::unordered_set<ExtendedEndPoint*, Hash, Equals> SetType; |
82 | | SetType _set; |
83 | | std::mutex _mutex; |
84 | | }; |
85 | | |
86 | | class ExtendedEndPoint { |
87 | | public: |
88 | | // Construct ExtendedEndPoint. |
89 | | // User should use create() functions to get ExtendedEndPoint instance. |
90 | 0 | ExtendedEndPoint(void) { |
91 | 0 | _ref_count.store(0, butil::memory_order_relaxed); |
92 | 0 | _u.sa.sa_family = AF_UNSPEC; |
93 | 0 | } |
94 | | |
95 | | public: |
96 | | // Create ExtendedEndPoint. |
97 | | // If creation is successful, create()s will embed the ExtendedEndPoint instance in the given EndPoint*, |
98 | | // and return it as well. Or else, the given EndPoint* won't be touched. |
99 | | // |
100 | | // The format of the parameter is inspired by nginx. |
101 | | // Valid forms are: |
102 | | // - ipv6 |
103 | | // without port: [2400:da00::3b0b] |
104 | | // with port: [2400:da00::3b0b]:8080 |
105 | | // - unix domain socket |
106 | | // abslute path : unix:/path/to/file.sock |
107 | | // relative path: unix:path/to/file.sock |
108 | | |
109 | 0 | static ExtendedEndPoint* create(StringPiece sp, EndPoint* ep) { |
110 | 0 | sp.trim_spaces(); |
111 | 0 | if (sp.empty()) { |
112 | 0 | return NULL; |
113 | 0 | } |
114 | 0 | if (sp[0] == '[') { |
115 | 0 | size_t colon_pos = sp.find(']'); |
116 | 0 | if (colon_pos == StringPiece::npos || colon_pos == 1 /* [] is invalid */ || ++colon_pos >= sp.size()) { |
117 | 0 | return NULL; |
118 | 0 | } |
119 | 0 | StringPiece port_sp = sp.substr(colon_pos); |
120 | 0 | if (port_sp.size() < 2 /* colon and at least one integer */ || port_sp[0] != ':') { |
121 | 0 | return NULL; |
122 | 0 | } |
123 | 0 | port_sp.remove_prefix(1); // remove `:' |
124 | 0 | if (port_sp.size() > 5) { // max 65535 |
125 | 0 | return NULL; |
126 | 0 | } |
127 | 0 | char buf[6]; |
128 | 0 | buf[port_sp.copy(buf, port_sp.size())] = '\0'; |
129 | 0 | char* end = NULL; |
130 | 0 | int port = ::strtol(buf, &end, 10 /* base */); |
131 | 0 | if (end != buf + port_sp.size()) { |
132 | 0 | return NULL; |
133 | 0 | } |
134 | 0 | return create(sp.substr(0, colon_pos), port, ep); |
135 | 0 | } else if (sp.starts_with("unix:")) { |
136 | 0 | return create(sp, EXTENDED_ENDPOINT_PORT, ep); |
137 | 0 | } |
138 | 0 | return NULL; |
139 | 0 | } |
140 | | |
141 | 0 | static ExtendedEndPoint* create(StringPiece sp, int port, EndPoint* ep) { |
142 | 0 | sp.trim_spaces(); |
143 | 0 | if (sp.empty()) { |
144 | 0 | return NULL; |
145 | 0 | } |
146 | 0 | ExtendedEndPoint* eep = NULL; |
147 | 0 | if (sp[0] == '[' && port >= 0 && port <= 65535) { |
148 | 0 | if (sp.back() != ']' || sp.size() == 2 || sp.size() - 2 >= INET6_ADDRSTRLEN) { |
149 | 0 | return NULL; |
150 | 0 | } |
151 | 0 | char buf[INET6_ADDRSTRLEN]; |
152 | 0 | buf[sp.copy(buf, sp.size() - 2 /* skip `[' and `]' */, 1 /* skip `[' */)] = '\0'; |
153 | |
|
154 | 0 | in6_addr addr; |
155 | 0 | if (inet_pton(AF_INET6, buf, &addr) != 1 /* succ */) { |
156 | 0 | return NULL; |
157 | 0 | } |
158 | | |
159 | 0 | eep = new_extended_endpoint(AF_INET6); |
160 | 0 | if (eep) { |
161 | 0 | eep->_u.in6.sin6_addr = addr; |
162 | 0 | eep->_u.in6.sin6_port = htons(port); |
163 | 0 | eep->_u.in6.sin6_flowinfo = 0u; |
164 | 0 | eep->_u.in6.sin6_scope_id = 0u; |
165 | 0 | eep->_socklen = sizeof(_u.in6); |
166 | | #if defined(OS_MACOSX) |
167 | | eep->_u.in6.sin6_len = eep->_socklen; |
168 | | #endif |
169 | 0 | } |
170 | 0 | } else if (sp.starts_with("unix:")) { // ignore port |
171 | 0 | sp.remove_prefix(5); // remove `unix:' |
172 | 0 | if (sp.empty() || sp.size() >= UDS_PATH_SIZE) { |
173 | 0 | return NULL; |
174 | 0 | } |
175 | 0 | eep = new_extended_endpoint(AF_UNIX); |
176 | 0 | if (eep) { |
177 | 0 | int size = sp.copy(eep->_u.un.sun_path, sp.size()); |
178 | 0 | eep->_u.un.sun_path[size] = '\0'; |
179 | 0 | eep->_socklen = offsetof(sockaddr_un, sun_path) + size + 1; |
180 | | #if defined(OS_MACOSX) |
181 | | eep->_u.un.sun_len = eep->_socklen; |
182 | | #endif |
183 | 0 | } |
184 | 0 | } |
185 | 0 | if (eep) { |
186 | 0 | eep = dedup(eep); |
187 | 0 | eep->embed_to(ep); |
188 | 0 | } |
189 | 0 | return eep; |
190 | 0 | } |
191 | | |
192 | 0 | static ExtendedEndPoint* create(sockaddr_storage* ss, socklen_t size, EndPoint* ep) { |
193 | 0 | ExtendedEndPoint* eep = NULL; |
194 | 0 | if (ss->ss_family == AF_INET6 || ss->ss_family == AF_UNIX) { |
195 | 0 | eep = new_extended_endpoint(ss->ss_family); |
196 | 0 | } |
197 | 0 | if (eep) { |
198 | 0 | memcpy(&eep->_u.ss, ss, size); |
199 | 0 | eep->_socklen = size; |
200 | 0 | if (ss->ss_family == AF_UNIX && size == offsetof(sockaddr_un, sun_path)) { |
201 | | // See unix(7): When the address of an unnamed socket is returned, |
202 | | // its length is sizeof(sa_family_t), and sun_path should not be inspected. |
203 | 0 | eep->_u.un.sun_path[0] = '\0'; |
204 | 0 | } |
205 | 0 | eep = dedup(eep); |
206 | 0 | eep->embed_to(ep); |
207 | 0 | } |
208 | 0 | return eep; |
209 | 0 | } |
210 | | |
211 | | // Get ExtendedEndPoint instance from EndPoint |
212 | 0 | static ExtendedEndPoint* address(const EndPoint& ep) { |
213 | 0 | if (!is_extended(ep)) { |
214 | 0 | return NULL; |
215 | 0 | } |
216 | 0 | ::butil::ResourceId<ExtendedEndPoint> id; |
217 | 0 | id.value = ep.ip.s_addr; |
218 | 0 | ExtendedEndPoint* eep = ::butil::address_resource<ExtendedEndPoint>(id); |
219 | 0 | CHECK(eep) << "fail to address ExtendedEndPoint from EndPoint"; |
220 | 0 | return eep; |
221 | 0 | } |
222 | | |
223 | | // Check if an EndPoint has embedded ExtendedEndPoint |
224 | 0 | static bool is_extended(const butil::EndPoint& ep) { |
225 | 0 | return ep.port == EXTENDED_ENDPOINT_PORT; |
226 | 0 | } |
227 | | |
228 | | private: |
229 | | friend class GlobalEndPointSet; |
230 | | |
231 | 0 | static GlobalEndPointSet* global_set() { |
232 | 0 | return GlobalEndPointSet::instance(); |
233 | 0 | } |
234 | | |
235 | 0 | static ExtendedEndPoint* new_extended_endpoint(sa_family_t family) { |
236 | 0 | ::butil::ResourceId<ExtendedEndPoint> id; |
237 | 0 | ExtendedEndPoint* eep = ::butil::get_resource(&id); |
238 | 0 | if (eep) { |
239 | 0 | int64_t old_ref = eep->_ref_count.load(butil::memory_order_relaxed); |
240 | 0 | CHECK(old_ref == 0) << "new ExtendedEndPoint has reference " << old_ref; |
241 | 0 | CHECK(eep->_u.sa.sa_family == AF_UNSPEC) << "new ExtendedEndPoint has family " << eep->_u.sa.sa_family << " set"; |
242 | 0 | eep->_ref_count.store(1, butil::memory_order_relaxed); |
243 | 0 | eep->_id = id; |
244 | 0 | eep->_u.sa.sa_family = family; |
245 | 0 | } |
246 | 0 | return eep; |
247 | 0 | } |
248 | | |
249 | 0 | void embed_to(EndPoint* ep) const { |
250 | 0 | CHECK(0 == _id.value >> 32) << "ResourceId beyond index"; |
251 | 0 | ep->reset(); |
252 | 0 | ep->ip = ip_t{static_cast<uint32_t>(_id.value)}; |
253 | 0 | ep->port = EXTENDED_ENDPOINT_PORT; |
254 | 0 | } |
255 | | |
256 | 0 | static ExtendedEndPoint* dedup(ExtendedEndPoint* eep) { |
257 | 0 | eep->_hash = std::hash<std::string>()(std::string((const char*)&eep->_u, eep->_socklen)); |
258 | |
|
259 | 0 | ExtendedEndPoint* first_eep = global_set()->insert(eep); |
260 | 0 | if (first_eep != eep) { |
261 | 0 | eep->_ref_count.store(0, butil::memory_order_relaxed); |
262 | 0 | eep->_u.sa.sa_family = AF_UNSPEC; |
263 | 0 | ::butil::return_resource(eep->_id); |
264 | 0 | } |
265 | 0 | return first_eep; |
266 | 0 | } |
267 | | |
268 | | public: |
269 | | |
270 | 0 | void dec_ref(void) { |
271 | 0 | int64_t old_ref = _ref_count.fetch_sub(1, butil::memory_order_relaxed); |
272 | 0 | CHECK(old_ref >= 1) << "ExtendedEndPoint has unexpected reference " << old_ref; |
273 | 0 | if (old_ref == 1) { |
274 | 0 | global_set()->erase(this); |
275 | 0 | _u.sa.sa_family = AF_UNSPEC; |
276 | 0 | ::butil::return_resource(_id); |
277 | 0 | } |
278 | 0 | } |
279 | | |
280 | 0 | void inc_ref(void) { |
281 | 0 | int64_t old_ref = _ref_count.fetch_add(1, butil::memory_order_relaxed); |
282 | 0 | CHECK(old_ref >= 1) << "ExtendedEndPoint has unexpected reference " << old_ref; |
283 | 0 | } |
284 | | |
285 | 0 | sa_family_t family(void) const { |
286 | 0 | return _u.sa.sa_family; |
287 | 0 | } |
288 | | |
289 | 0 | int to(sockaddr_storage* ss) const { |
290 | 0 | memcpy(ss, &_u.ss, _socklen); |
291 | 0 | return _socklen; |
292 | 0 | } |
293 | | |
294 | 0 | void to(EndPointStr* ep_str) const { |
295 | 0 | if (_u.sa.sa_family == AF_UNIX) { |
296 | 0 | snprintf(ep_str->_buf, sizeof(ep_str->_buf), "unix:%s", _u.un.sun_path); |
297 | 0 | } else if (_u.sa.sa_family == AF_INET6) { |
298 | 0 | char buf[INET6_ADDRSTRLEN] = {0}; |
299 | 0 | const char* ret = inet_ntop(_u.sa.sa_family, &_u.in6.sin6_addr, buf, sizeof(buf)); |
300 | 0 | CHECK(ret) << "fail to do inet_ntop"; |
301 | 0 | snprintf(ep_str->_buf, sizeof(ep_str->_buf), "[%s]:%d", buf, ntohs(_u.in6.sin6_port)); |
302 | 0 | } else { |
303 | 0 | CHECK(0) << "family " << _u.sa.sa_family << " not supported"; |
304 | 0 | } |
305 | 0 | } |
306 | | |
307 | 0 | int to_hostname(char* host, size_t host_len) const { |
308 | 0 | if (_u.sa.sa_family == AF_UNIX) { |
309 | 0 | snprintf(host, host_len, "unix:%s", _u.un.sun_path); |
310 | 0 | return 0; |
311 | 0 | } else if (_u.sa.sa_family == AF_INET6) { |
312 | 0 | sockaddr_in6 sa = _u.in6; |
313 | 0 | if (getnameinfo((const sockaddr*) &sa, sizeof(sa), host, host_len, NULL, 0, NI_NAMEREQD) != 0) { |
314 | 0 | return -1; |
315 | 0 | } |
316 | 0 | size_t len = ::strlen(host); |
317 | 0 | if (len + 1 < host_len) { |
318 | 0 | snprintf(host + len, host_len - len, ":%d", _u.in6.sin6_port); |
319 | 0 | } |
320 | 0 | return 0; |
321 | 0 | } else { |
322 | 0 | CHECK(0) << "family " << _u.sa.sa_family << " not supported"; |
323 | 0 | return -1; |
324 | 0 | } |
325 | 0 | } |
326 | | |
327 | | private: |
328 | | static const size_t UDS_PATH_SIZE = sizeof(sockaddr_un::sun_path); |
329 | | |
330 | | butil::atomic<int64_t> _ref_count; |
331 | | butil::ResourceId<ExtendedEndPoint> _id; |
332 | | size_t _hash; // pre-compute hash code of sockaddr for saving unordered_set query time |
333 | | socklen_t _socklen; // valid data length of sockaddr |
334 | | union { |
335 | | sockaddr sa; |
336 | | sockaddr_in6 in6; |
337 | | sockaddr_un un; |
338 | | sockaddr_storage ss; |
339 | | } _u; |
340 | | }; |
341 | | |
342 | 0 | inline ExtendedEndPoint* GlobalEndPointSet::insert(ExtendedEndPoint* p) { |
343 | 0 | std::unique_lock<std::mutex> lock(_mutex); |
344 | 0 | auto it = _set.find(p); |
345 | 0 | if (it != _set.end()) { |
346 | 0 | if ((*it)->_ref_count.fetch_add(1, butil::memory_order_relaxed) == 0) { |
347 | | // another thread is calling dec_ref(), do not reuse it |
348 | 0 | (*it)->_ref_count.fetch_sub(1, butil::memory_order_relaxed); |
349 | 0 | _set.erase(it); |
350 | 0 | _set.insert(p); |
351 | 0 | return p; |
352 | 0 | } else { |
353 | | // the ExtendedEndPoint is valid, reuse it |
354 | 0 | return *it; |
355 | 0 | } |
356 | 0 | } |
357 | 0 | _set.insert(p); |
358 | 0 | return p; |
359 | 0 | } |
360 | | |
361 | 0 | inline void GlobalEndPointSet::erase(ExtendedEndPoint* p) { |
362 | 0 | std::unique_lock<std::mutex> lock(_mutex); |
363 | 0 | auto it = _set.find(p); |
364 | 0 | if (it == _set.end() || *it != p) { |
365 | | // another thread has been erase it |
366 | 0 | return; |
367 | 0 | } |
368 | 0 | _set.erase(it); |
369 | 0 | } |
370 | | |
371 | 0 | inline size_t GlobalEndPointSet::Hash::operator()(ExtendedEndPoint* const& p) const { |
372 | 0 | return p->_hash; |
373 | 0 | } |
374 | | |
375 | 0 | inline bool GlobalEndPointSet::Equals::operator()(ExtendedEndPoint* const& p1, ExtendedEndPoint* const& p2) const { |
376 | 0 | return p1->_socklen == p2->_socklen |
377 | 0 | && memcmp(&p1->_u, &p2->_u, p1->_socklen) == 0; |
378 | 0 | } |
379 | | |
380 | | } // namespace details |
381 | | } // namespace butil |
382 | | |
383 | | #endif // BUTIL_DETAILS_EXTENDED_ENDPOINT_H |