/src/brpc/src/brpc/policy/nacos_naming_service.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "nacos_naming_service.h" |
19 | | |
20 | | #include <gflags/gflags.h> |
21 | | |
22 | | #include <set> |
23 | | |
24 | | #include "brpc/http_status_code.h" |
25 | | #include "brpc/log.h" |
26 | | #include "butil/iobuf.h" |
27 | | #include "butil/logging.h" |
28 | | #include "butil/third_party/rapidjson/document.h" |
29 | | |
30 | | namespace brpc { |
31 | | namespace policy { |
32 | | |
33 | | DEFINE_string(nacos_address, "", |
34 | | "The query string of request nacos for discovering service."); |
35 | | DEFINE_string(nacos_service_discovery_path, "/nacos/v1/ns/instance/list", |
36 | | "The url path for discovering service."); |
37 | | DEFINE_string(nacos_service_auth_path, "/nacos/v1/auth/login", |
38 | | "The url path for authentiction."); |
39 | | DEFINE_int32(nacos_connect_timeout_ms, 200, |
40 | | "Timeout for creating connections to nacos in milliseconds"); |
41 | | DEFINE_string(nacos_username, "", "nacos username"); |
42 | | DEFINE_string(nacos_password, "", "nacos password"); |
43 | | DEFINE_string(nacos_load_balancer, "rr", "nacos load balancer name"); |
44 | | |
45 | 0 | int NacosNamingService::Connect() { |
46 | 0 | ChannelOptions opt; |
47 | 0 | opt.protocol = PROTOCOL_HTTP; |
48 | 0 | opt.connect_timeout_ms = FLAGS_nacos_connect_timeout_ms; |
49 | 0 | const int ret = _channel.Init(FLAGS_nacos_address.c_str(), |
50 | 0 | FLAGS_nacos_load_balancer.c_str(), &opt); |
51 | 0 | if (ret != 0) { |
52 | 0 | LOG(ERROR) << "Fail to init channel to nacos at " |
53 | 0 | << FLAGS_nacos_address; |
54 | 0 | } |
55 | 0 | return ret; |
56 | 0 | } |
57 | | |
58 | 0 | int NacosNamingService::RefreshAccessToken(const char *service_name) { |
59 | 0 | Controller cntl; |
60 | 0 | cntl.http_request().uri() = FLAGS_nacos_service_auth_path; |
61 | 0 | cntl.http_request().set_method(brpc::HttpMethod::HTTP_METHOD_POST); |
62 | 0 | cntl.http_request().set_content_type("application/x-www-form-urlencoded"); |
63 | |
|
64 | 0 | auto &buf = cntl.request_attachment(); |
65 | 0 | buf.append("username="); |
66 | 0 | buf.append(FLAGS_nacos_username); |
67 | 0 | buf.append("&password="); |
68 | 0 | buf.append(FLAGS_nacos_password); |
69 | |
|
70 | 0 | _channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr); |
71 | 0 | if (cntl.Failed()) { |
72 | 0 | LOG(ERROR) << "Fail to access " << FLAGS_nacos_service_auth_path << ": " |
73 | 0 | << cntl.ErrorText(); |
74 | 0 | return -1; |
75 | 0 | } |
76 | | |
77 | 0 | BUTIL_RAPIDJSON_NAMESPACE::Document doc; |
78 | 0 | if (doc.Parse(cntl.response_attachment().to_string().c_str()) |
79 | 0 | .HasParseError()) { |
80 | 0 | LOG(ERROR) << "Failed to parse nacos auth response"; |
81 | 0 | return -1; |
82 | 0 | } |
83 | 0 | if (!doc.IsObject()) { |
84 | 0 | LOG(ERROR) << "The nacos's auth response for " << service_name |
85 | 0 | << " is not a json object"; |
86 | 0 | return -1; |
87 | 0 | } |
88 | | |
89 | 0 | auto iter = doc.FindMember("accessToken"); |
90 | 0 | if (iter != doc.MemberEnd() && iter->value.IsString()) { |
91 | 0 | _access_token = iter->value.GetString(); |
92 | 0 | } else { |
93 | 0 | LOG(ERROR) << "The nacos's auth response for " << service_name |
94 | 0 | << " has no accessToken field"; |
95 | 0 | return -1; |
96 | 0 | } |
97 | | |
98 | 0 | auto iter_ttl = doc.FindMember("tokenTtl"); |
99 | 0 | if (iter_ttl != doc.MemberEnd() && iter_ttl->value.IsInt()) { |
100 | 0 | _token_expire_time = time(NULL) + iter_ttl->value.GetInt() - 10; |
101 | 0 | } else { |
102 | 0 | _token_expire_time = 0; |
103 | 0 | } |
104 | |
|
105 | 0 | return 0; |
106 | 0 | } |
107 | | |
108 | | int NacosNamingService::GetServerNodes(const char *service_name, |
109 | | bool token_changed, |
110 | 0 | std::vector<ServerNode> *nodes) { |
111 | 0 | if (_nacos_url.empty() || token_changed) { |
112 | 0 | _nacos_url = FLAGS_nacos_service_discovery_path; |
113 | 0 | _nacos_url += "?"; |
114 | 0 | if (!_access_token.empty()) { |
115 | 0 | _nacos_url += "accessToken=" + _access_token; |
116 | 0 | _nacos_url += "&"; |
117 | 0 | } |
118 | 0 | _nacos_url += service_name; |
119 | 0 | } |
120 | |
|
121 | 0 | Controller cntl; |
122 | 0 | cntl.http_request().uri() = _nacos_url; |
123 | 0 | _channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr); |
124 | 0 | if (cntl.Failed()) { |
125 | 0 | LOG(ERROR) << "Fail to access " << _nacos_url << ": " |
126 | 0 | << cntl.ErrorText(); |
127 | 0 | return -1; |
128 | 0 | } |
129 | 0 | if (cntl.http_response().status_code() != HTTP_STATUS_OK) { |
130 | 0 | LOG(ERROR) << "Failed to request nacos, http status code: " |
131 | 0 | << cntl.http_response().status_code(); |
132 | 0 | return -1; |
133 | 0 | } |
134 | | |
135 | 0 | BUTIL_RAPIDJSON_NAMESPACE::Document doc; |
136 | 0 | if (doc.Parse(cntl.response_attachment().to_string().c_str()) |
137 | 0 | .HasParseError()) { |
138 | 0 | LOG(ERROR) << "Failed to parse nacos response"; |
139 | 0 | return -1; |
140 | 0 | } |
141 | 0 | if (!doc.IsObject()) { |
142 | 0 | LOG(ERROR) << "The nacos's response for " << service_name |
143 | 0 | << " is not a json object"; |
144 | 0 | return -1; |
145 | 0 | } |
146 | | |
147 | 0 | auto it_hosts = doc.FindMember("hosts"); |
148 | 0 | if (it_hosts == doc.MemberEnd()) { |
149 | 0 | LOG(ERROR) << "The nacos's response for " << service_name |
150 | 0 | << " has no hosts member"; |
151 | 0 | return -1; |
152 | 0 | } |
153 | 0 | auto &hosts = it_hosts->value; |
154 | 0 | if (!hosts.IsArray()) { |
155 | 0 | LOG(ERROR) << "hosts member in nacos response is not an array"; |
156 | 0 | return -1; |
157 | 0 | } |
158 | | |
159 | 0 | std::set<ServerNode> presence; |
160 | 0 | for (auto it = hosts.Begin(); it != hosts.End(); ++it) { |
161 | 0 | auto &host = *it; |
162 | 0 | if (!host.IsObject()) { |
163 | 0 | LOG(ERROR) << "host member in nacos response is not an object"; |
164 | 0 | continue; |
165 | 0 | } |
166 | | |
167 | 0 | auto it_ip = host.FindMember("ip"); |
168 | 0 | if (it_ip == host.MemberEnd() || !it_ip->value.IsString()) { |
169 | 0 | LOG(ERROR) << "host in nacos response has not ip"; |
170 | 0 | continue; |
171 | 0 | } |
172 | 0 | auto &ip = it_ip->value; |
173 | |
|
174 | 0 | auto it_port = host.FindMember("port"); |
175 | 0 | if (it_port == host.MemberEnd() || !it_port->value.IsInt()) { |
176 | 0 | LOG(ERROR) << "host in nacos response has not port"; |
177 | 0 | continue; |
178 | 0 | } |
179 | 0 | auto &port = it_port->value; |
180 | |
|
181 | 0 | auto it_enabled = host.FindMember("enabled"); |
182 | 0 | if (it_enabled == host.MemberEnd() || !(it_enabled->value.IsBool()) || |
183 | 0 | !(it_enabled->value.GetBool())) { |
184 | 0 | LOG(INFO) << "nacos " << ip.GetString() << ":" << port.GetInt() |
185 | 0 | << " is not enabled"; |
186 | 0 | continue; |
187 | 0 | } |
188 | | |
189 | 0 | auto it_healthy = host.FindMember("healthy"); |
190 | 0 | if (it_healthy == host.MemberEnd() || !(it_healthy->value.IsBool()) || |
191 | 0 | !(it_healthy->value.GetBool())) { |
192 | 0 | LOG(INFO) << "nacos " << ip.GetString() << ":" << port.GetInt() |
193 | 0 | << " is not healthy"; |
194 | 0 | continue; |
195 | 0 | } |
196 | | |
197 | 0 | butil::EndPoint end_point; |
198 | 0 | if (str2endpoint(ip.GetString(), port.GetUint(), &end_point) != 0) { |
199 | 0 | LOG(ERROR) << "ncos service with illegal address or port: " |
200 | 0 | << ip.GetString() << ":" << port.GetUint(); |
201 | 0 | continue; |
202 | 0 | } |
203 | | |
204 | 0 | ServerNode node(end_point); |
205 | 0 | auto it_weight = host.FindMember("weight"); |
206 | 0 | if (it_weight != host.MemberEnd() && it_weight->value.IsNumber()) { |
207 | 0 | node.tag = |
208 | 0 | std::to_string(static_cast<long>(it_weight->value.GetDouble())); |
209 | 0 | } |
210 | |
|
211 | 0 | presence.insert(node); |
212 | 0 | } |
213 | |
|
214 | 0 | nodes->reserve(presence.size()); |
215 | 0 | nodes->assign(presence.begin(), presence.end()); |
216 | |
|
217 | 0 | if (nodes->empty() && hosts.Size() != 0) { |
218 | 0 | LOG(ERROR) << "All service about " << service_name |
219 | 0 | << " from nacos is invalid, refuse to update servers"; |
220 | 0 | return -1; |
221 | 0 | } |
222 | | |
223 | 0 | RPC_VLOG << "Got " << nodes->size() |
224 | 0 | << (nodes->size() > 1 ? " servers" : " server") << " from " |
225 | 0 | << service_name; |
226 | |
|
227 | 0 | auto it_cache = doc.FindMember("cacheMillis"); |
228 | 0 | if (it_cache != doc.MemberEnd() && it_cache->value.IsInt64()) { |
229 | 0 | _cache_ms = it_cache->value.GetInt64(); |
230 | 0 | } |
231 | |
|
232 | 0 | return 0; |
233 | 0 | } |
234 | | |
235 | | NacosNamingService::NacosNamingService() |
236 | 0 | : _nacos_connected(false), _cache_ms(-1), _token_expire_time(0) {} |
237 | | |
238 | 0 | int NacosNamingService::GetNamingServiceAccessIntervalMs() const { |
239 | 0 | if (0 < _cache_ms) { |
240 | 0 | return _cache_ms; |
241 | 0 | } |
242 | 0 | return PeriodicNamingService::GetNamingServiceAccessIntervalMs(); |
243 | 0 | } |
244 | | |
245 | | int NacosNamingService::GetServers(const char *service_name, |
246 | 0 | std::vector<ServerNode> *servers) { |
247 | 0 | if (!_nacos_connected) { |
248 | 0 | const int ret = Connect(); |
249 | 0 | if (0 == ret) { |
250 | 0 | _nacos_connected = true; |
251 | 0 | } else { |
252 | 0 | return ret; |
253 | 0 | } |
254 | 0 | } |
255 | | |
256 | 0 | const bool authentiction_enabled = |
257 | 0 | !FLAGS_nacos_username.empty() && !FLAGS_nacos_password.empty(); |
258 | 0 | const bool has_invalid_access_token = |
259 | 0 | _access_token.empty() || |
260 | 0 | (0 < _token_expire_time && _token_expire_time <= time(NULL)); |
261 | 0 | bool token_changed = false; |
262 | |
|
263 | 0 | if (authentiction_enabled && has_invalid_access_token) { |
264 | 0 | const int ret = RefreshAccessToken(service_name); |
265 | 0 | if (ret == 0) { |
266 | 0 | token_changed = true; |
267 | 0 | } else { |
268 | 0 | return ret; |
269 | 0 | } |
270 | 0 | } |
271 | | |
272 | 0 | servers->clear(); |
273 | 0 | return GetServerNodes(service_name, token_changed, servers); |
274 | 0 | } |
275 | | |
276 | | void NacosNamingService::Describe(std::ostream &os, |
277 | 0 | const DescribeOptions &) const { |
278 | 0 | os << "nacos"; |
279 | 0 | return; |
280 | 0 | } |
281 | | |
282 | 0 | NamingService *NacosNamingService::New() const { |
283 | 0 | return new NacosNamingService; |
284 | 0 | } |
285 | | |
286 | 0 | void NacosNamingService::Destroy() { delete this; } |
287 | | |
288 | | } // namespace policy |
289 | | } // namespace brpc |